Skip to content

Commit

Permalink
feat(node): worker_threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Mesteery committed Aug 27, 2021
1 parent dd33422 commit 299b473
Show file tree
Hide file tree
Showing 5 changed files with 483 additions and 42 deletions.
2 changes: 1 addition & 1 deletion node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Deno standard library as it's a compatibility module.
- [ ] vm
- [ ] wasi
- [ ] webcrypto
- [ ] worker_threads
- [x] worker_threads
- [ ] zlib

* [x] node globals _partly_
Expand Down
84 changes: 43 additions & 41 deletions node/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ export interface WrappedFunction extends Function {
listener: GenericFunction;
}

export interface OnceableEventEmitter {
once(event: string | symbol, listener: GenericFunction): unknown;
removeListener(
event: string | symbol,
listener: GenericFunction,
): unknown;
}

export interface OnableEventEmitter {
on(event: string | symbol, listener: GenericFunction): unknown;
removeListener(
event: string | symbol,
listener: GenericFunction,
): unknown;
}

function ensureArray<T>(maybeArray: T[] | T): T[] {
return Array.isArray(maybeArray) ? maybeArray : [maybeArray];
}
Expand Down Expand Up @@ -175,8 +191,8 @@ export class EventEmitter {
* Returns an array listing the events for which the emitter has
* registered listeners.
*/
public eventNames(): [string | symbol] {
return Reflect.ownKeys(this._events) as [string | symbol];
public eventNames(): (string | symbol)[] {
return Reflect.ownKeys(this._events);
}

/**
Expand Down Expand Up @@ -483,51 +499,37 @@ export class EventEmitter {
* will resolve with an array of all the arguments emitted to the given event.
*/
public static once(
emitter: EventEmitter | EventTarget,
name: string,
emitter: OnceableEventEmitter,
name: string | symbol,
// deno-lint-ignore no-explicit-any
): Promise<any[]> {
return new Promise((resolve, reject) => {
if (emitter instanceof EventTarget) {
// EventTarget does not have `error` event semantics like Node
// EventEmitters, we do not listen to `error` events here.
emitter.addEventListener(
name,
(...args) => {
resolve(args);
},
{ once: true, passive: false, capture: false },
);
return;
} else if (emitter instanceof EventEmitter) {
// deno-lint-ignore no-explicit-any
const eventListener = (...args: any[]): void => {
if (errorListener !== undefined) {
emitter.removeListener("error", errorListener);
}
resolve(args);
};
let errorListener: GenericFunction;

// Adding an error listener is not optional because
// if an error is thrown on an event emitter we cannot
// guarantee that the actual event we are waiting will
// be fired. The result could be a silent way to create
// memory or file descriptor leaks, which is something
// we should avoid.
if (name !== "error") {
// deno-lint-ignore no-explicit-any
const eventListener = (...args: any[]): void => {
if (errorListener !== undefined) {
emitter.removeListener("error", errorListener);
}
resolve(args);
errorListener = (err: any): void => {
emitter.removeListener(name, eventListener);
reject(err);
};
let errorListener: GenericFunction;

// Adding an error listener is not optional because
// if an error is thrown on an event emitter we cannot
// guarantee that the actual event we are waiting will
// be fired. The result could be a silent way to create
// memory or file descriptor leaks, which is something
// we should avoid.
if (name !== "error") {
// deno-lint-ignore no-explicit-any
errorListener = (err: any): void => {
emitter.removeListener(name, eventListener);
reject(err);
};

emitter.once("error", errorListener);
}

emitter.once(name, eventListener);
return;
emitter.once("error", errorListener);
}

emitter.once(name, eventListener);
});
}

Expand All @@ -538,7 +540,7 @@ export class EventEmitter {
* emitted event arguments.
*/
public static on(
emitter: EventEmitter,
emitter: OnableEventEmitter,
event: string | symbol,
): AsyncIterable {
// deno-lint-ignore no-explicit-any
Expand Down
29 changes: 29 additions & 0 deletions node/testdata/worker_threads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {
getEnvironmentData,
isMainThread,
parentPort,
threadId,
workerData,
} from "../worker_threads.ts";
import { once } from "../events.ts";

async function message(expectedMessage: string) {
const [message] = await once(parentPort, "message");
if (message !== expectedMessage) {
// fail test
parentPort.close();
}
}

await message("Hello, how are you my thread?");
parentPort.postMessage("I'm fine!");

parentPort.postMessage({
isMainThread,
threadId,
workerData: Array.isArray(workerData) &&
workerData[workerData.length - 1] instanceof MessagePort
? workerData.slice(0, -1)
: workerData,
envData: [getEnvironmentData("test"), getEnvironmentData(1)],
});
223 changes: 223 additions & 0 deletions node/worker_threads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/// <reference lib="webworker"/>

import { resolve, toFileUrl } from "../path/mod.ts";
import { notImplemented } from "./_utils.ts";
import { EventEmitter, GenericFunction, once } from "./events.ts";

let environmentData = new Map();
let threads = 0;

interface WorkerOptions {
// only for typings
argv?: unknown[];
env?: Record<string, unknown>;
execArgv?: string[];
stdin?: boolean;
stdout?: boolean;
stderr?: boolean;
trackUnmanagedFds?: boolean;
resourceLimits?: {
maxYoungGenerationSizeMb?: number;
maxOldGenerationSizeMb?: number;
codeRangeSizeMb?: number;
stackSizeMb?: number;
};

eval?: boolean;
transferList?: Transferable[];
workerData?: unknown;
}

const kHandle = Symbol("kHandle");
class _Worker extends EventEmitter {
readonly threadId: number;
readonly resourceLimits: Required<
NonNullable<WorkerOptions["resourceLimits"]>
> = {
maxYoungGenerationSizeMb: -1,
maxOldGenerationSizeMb: -1,
codeRangeSizeMb: -1,
stackSizeMb: 4,
};
private readonly [kHandle]: globalThis.Worker;

postMessage: Worker["postMessage"];

constructor(specifier: URL | string, options?: WorkerOptions) {
super();
if (options?.eval === true) {
specifier = `data:text/javascript,${specifier}`;
} else if (typeof specifier === "string") {
specifier = toFileUrl(resolve(specifier));
}
const handle = this[kHandle] = new globalThis.Worker(
specifier,
{
...(options || {}),
type: "module",
// unstable
deno: { namespace: true },
},
);
handle.addEventListener(
"error",
(event) => this.emit("error", event.error || event.message),
);
handle.addEventListener(
"messageerror",
(event) => this.emit("messageerror", event.data),
);
handle.addEventListener(
"message",
(event) => this.emit("message", event.data),
);
handle.postMessage({
environmentData,
threadId: (this.threadId = ++threads),
workerData: options?.workerData,
}, options?.transferList || []);
this.postMessage = this[kHandle].postMessage.bind(this[kHandle]);
this.emit("online");
}

terminate() {
this[kHandle].terminate();
this.emit("exit", 0);
}

readonly getHeapSnapshot = notImplemented;
// fake performance
readonly performance = globalThis.performance;
}

export const isMainThread = typeof DedicatedWorkerGlobalScope === "undefined" ||
self instanceof DedicatedWorkerGlobalScope === false;

// fake resourceLimits
export const resourceLimits = isMainThread ? {} : {
maxYoungGenerationSizeMb: 48,
maxOldGenerationSizeMb: 2048,
codeRangeSizeMb: 0,
stackSizeMb: 4,
};

let threadId = 0;
let workerData: unknown = null;

// Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611
interface NodeEventTarget extends
Pick<
EventEmitter,
"eventNames" | "listenerCount" | "emit" | "removeAllListeners"
> {
setMaxListeners(n: number): void;
getMaxListeners(): number;
off(eventName: string, listener: GenericFunction): NodeEventTarget;
on(eventName: string, listener: GenericFunction): NodeEventTarget;
once(eventName: string, listener: GenericFunction): NodeEventTarget;
addListener: NodeEventTarget["on"];
removeListener: NodeEventTarget["off"];
}

type ParentPort =
& DedicatedWorkerGlobalScope
& typeof globalThis
& NodeEventTarget;

// deno-lint-ignore no-explicit-any
let parentPort: ParentPort = null as any;

if (!isMainThread) {
// deno-lint-ignore no-explicit-any
const listeners = new WeakMap<GenericFunction, (ev: any) => any>();

parentPort = self as ParentPort;
parentPort.off = parentPort.removeListener = function (
this: ParentPort,
name,
listener,
) {
this.removeEventListener(name, listeners.get(listener)!);
listeners.delete(listener);
return this;
};
parentPort.on = parentPort.addListener = function (
this: ParentPort,
name,
listener,
) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
};
parentPort.once = function (this: ParentPort, name, listener) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
};

// mocks
parentPort.setMaxListeners = () => {};
parentPort.getMaxListeners = () => Infinity;
parentPort.eventNames = () => [""];
parentPort.listenerCount = () => 0;

parentPort.emit = () => notImplemented();
parentPort.removeAllListeners = () => notImplemented();

([{ threadId, workerData, environmentData }] = await once(
parentPort,
"message",
));

// alias
parentPort.addEventListener("offline", () => parentPort.emit("close"));
}

export function getEnvironmentData(key: unknown) {
return environmentData.get(key);
}

export function setEnvironmentData(key: unknown, value?: unknown) {
if (value === undefined) {
environmentData.delete(key);
} else {
environmentData.set(key, value);
}
}

export const MessagePort = globalThis.MessagePort;
export const MessageChannel = globalThis.MessageChannel;
export const BroadcastChannel = globalThis.BroadcastChannel;
export const SHARE_ENV = Symbol.for("nodejs.worker_threads.SHARE_ENV");
export {
_Worker as Worker,
notImplemented as markAsUntransferable,
notImplemented as moveMessagePortToContext,
notImplemented as receiveMessageOnPort,
parentPort,
threadId,
workerData,
};

export default {
markAsUntransferable: notImplemented,
moveMessagePortToContext: notImplemented,
receiveMessageOnPort: notImplemented,
MessagePort,
MessageChannel,
BroadcastChannel,
Worker: _Worker,
getEnvironmentData,
setEnvironmentData,
SHARE_ENV,
threadId,
workerData,
resourceLimits,
parentPort,
isMainThread,
};
Loading

0 comments on commit 299b473

Please sign in to comment.