diff --git a/lib/internal/main/worker_thread.js b/lib/internal/main/worker_thread.js index c0b151a1eac9dea..ae920a58cf4296f 100644 --- a/lib/internal/main/worker_thread.js +++ b/lib/internal/main/worker_thread.js @@ -97,6 +97,7 @@ port.on('message', (message) => { manifestSrc, manifestURL, publicPort, + hooksPort, workerData, } = message; @@ -111,6 +112,7 @@ port.on('message', (message) => { } require('internal/worker').assignEnvironmentData(environmentData); + require('internal/worker').hooksPort = hooksPort; if (SharedArrayBuffer !== undefined) { // The counter is only passed to the workers created by the main thread, diff --git a/lib/internal/modules/esm/hooks.js b/lib/internal/modules/esm/hooks.js index ba655116a0bb572..742b7bc549f9600 100644 --- a/lib/internal/modules/esm/hooks.js +++ b/lib/internal/modules/esm/hooks.js @@ -35,7 +35,7 @@ const { const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors'); const { URL } = require('internal/url'); const { canParse: URLCanParse } = internalBinding('url'); -const { receiveMessageOnPort } = require('worker_threads'); +const { receiveMessageOnPort, isMainThread } = require('worker_threads'); const { isAnyArrayBuffer, isArrayBufferView, @@ -481,6 +481,7 @@ class HooksProxy { * The InternalWorker instance, which lets us communicate with the loader thread. */ #worker; + #portToHooksThread; /** * The last notification ID received from the worker. This is used to detect @@ -499,26 +500,43 @@ class HooksProxy { #isReady = false; constructor() { - const { InternalWorker } = require('internal/worker'); + const { InternalWorker, hooksPort } = require('internal/worker'); MessageChannel ??= require('internal/worker/io').MessageChannel; const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH); this.#lock = new Int32Array(lock); - this.#worker = new InternalWorker(loaderWorkerId, { - stderr: false, - stdin: false, - stdout: false, - trackUnmanagedFds: false, - workerData: { - lock, - }, - }); - this.#worker.unref(); // ! Allows the process to eventually exit. - this.#worker.on('exit', process.exit); + if (isMainThread) { + // main thread is the only one that creates the internal single hooks worker + const { port1: portToHooksThread, port2: portFromHooksThread } = new MessageChannel; + this.#worker = new InternalWorker(loaderWorkerId, { + stderr: false, + stdin: false, + stdout: false, + trackUnmanagedFds: false, + workerData: { + registrationPort: portFromHooksThread, + lock, + }, + transferList: [portFromHooksThread] + }); + this.#worker.unref(); // ! Allows the process to eventually exit. + this.#worker.on('exit', process.exit); + this.#portToHooksThread = this.#worker; + } else { + this.#portToHooksThread = hooksPort; + } } waitForWorker() { + // there is one Hooks instance for each worker thread. But only one of these Hooks instances + // has an InternalWorker. That was the Hooks instance created for the main thread. + // It means for all Hooks instances that are not on the main thread => they are ready because they + // delegate to the single InternalWorker anyway. + if (!isMainThread) { + return; + } + if (!this.#isReady) { const { kIsOnline } = require('internal/worker'); if (!this.#worker[kIsOnline]) { @@ -535,6 +553,22 @@ class HooksProxy { } } + #postMessageToWorker(method, type, transferList, ...args) { + this.waitForWorker(); + MessageChannel ??= require('internal/worker/io').MessageChannel; + const { port1: fromHooksThread, port2: toHooksThread } = new MessageChannel(); + + // Pass work to the worker. + debug(`post ${type} message to worker`, { method, args, transferList }); + const usedTransferList = [toHooksThread]; + if (transferList) { + ArrayPrototypePushApply(usedTransferList, transferList); + } + this.#portToHooksThread.postMessage({ __proto__: null, method, args, lock: this.#lock, port: toHooksThread }, usedTransferList); + + return fromHooksThread; + } + /** * Invoke a remote method asynchronously. * @param {string} method Method to invoke @@ -543,22 +577,7 @@ class HooksProxy { * @returns {Promise} */ async makeAsyncRequest(method, transferList, ...args) { - this.waitForWorker(); - - MessageChannel ??= require('internal/worker/io').MessageChannel; - const asyncCommChannel = new MessageChannel(); - - // Pass work to the worker. - debug('post async message to worker', { method, args, transferList }); - const finalTransferList = [asyncCommChannel.port2]; - if (transferList) { - ArrayPrototypePushApply(finalTransferList, transferList); - } - this.#worker.postMessage({ - __proto__: null, - method, args, - port: asyncCommChannel.port2, - }, finalTransferList); + const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, ...args); if (this.#numberOfPendingAsyncResponses++ === 0) { // On the next lines, the main thread will await a response from the worker thread that might @@ -567,7 +586,12 @@ class HooksProxy { // However we want to keep the process alive until the worker thread responds (or until the // event loop of the worker thread is also empty), so we ref the worker until we get all the // responses back. - this.#worker.ref(); + if (this.#worker) { + this.#worker.ref(); + } + else { + this.#portToHooksThread.ref(); + } } let response; @@ -576,18 +600,26 @@ class HooksProxy { await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value; this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); - response = receiveMessageOnPort(asyncCommChannel.port1); + response = receiveMessageOnPort(fromHooksThread); } while (response == null); debug('got async response from worker', { method, args }, this.#lock); if (--this.#numberOfPendingAsyncResponses === 0) { // We got all the responses from the worker, its job is done (until next time). - this.#worker.unref(); + if (this.#worker) { + this.#worker.unref(); + } + else { + this.#portToHooksThread.unref(); + } + } + + if (response.message.status === 'exit') { + process.exit(response.message.body); } - const body = this.#unwrapMessage(response); - asyncCommChannel.port1.close(); - return body; + fromHooksThread.close(); + return this.#unwrapMessage(response); } /** @@ -598,11 +630,7 @@ class HooksProxy { * @returns {any} */ makeSyncRequest(method, transferList, ...args) { - this.waitForWorker(); - - // Pass work to the worker. - debug('post sync message to worker', { method, args, transferList }); - this.#worker.postMessage({ __proto__: null, method, args }, transferList); + const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, ...args); let response; do { @@ -611,7 +639,7 @@ class HooksProxy { AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId); this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); - response = this.#worker.receiveMessageSync(); + response = receiveMessageOnPort(fromHooksThread); } while (response == null); debug('got sync response from worker', { method, args }); if (response.message.status === 'never-settle') { @@ -619,6 +647,7 @@ class HooksProxy { } else if (response.message.status === 'exit') { process.exit(response.message.body); } + fromHooksThread.close(); return this.#unwrapMessage(response); } diff --git a/lib/internal/modules/esm/loader.js b/lib/internal/modules/esm/loader.js index df7a26c9337c398..512783f68d0109a 100644 --- a/lib/internal/modules/esm/loader.js +++ b/lib/internal/modules/esm/loader.js @@ -40,6 +40,7 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap'); const { urlToFilename, } = require('internal/modules/helpers'); +const { isMainThread } = require('worker_threads'); let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer; /** @@ -594,10 +595,11 @@ class CustomizedModuleLoader { */ constructor() { getHooksProxy(); + _hasCustomizations = true; } /** - * Register some loader specifier. + * Register a loader specifier. * @param {string} originalSpecifier The specified URL path of the loader to * be registered. * @param {string} parentURL The parent URL from where the loader will be @@ -608,7 +610,11 @@ class CustomizedModuleLoader { * @returns {{ format: string, url: URL['href'] }} */ register(originalSpecifier, parentURL, data, transferList) { - return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data); + if (isMainThread) { + // only the main thread has a Hooks instance with worker thread. All other Worker threads + // delegate thier hooks to the HooksThread of the main thread. + return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data); + } } /** @@ -617,7 +623,7 @@ class CustomizedModuleLoader { * be resolved. * @param {string} [parentURL] The URL path of the module's parent. * @param {ImportAttributes} importAttributes Attributes from the import - * statement or expression. + * statement or exp-ression. * @returns {{ format: string, url: URL['href'] }} */ resolve(originalSpecifier, parentURL, importAttributes) { @@ -706,6 +712,12 @@ function getHooksProxy() { return hooksProxy; } +let _hasCustomizations = false; +function hasCustomizations() { + return _hasCustomizations; +} + + let cascadedLoader; /** @@ -767,6 +779,7 @@ function register(specifier, parentURL = undefined, options) { module.exports = { createModuleLoader, + hasCustomizations, getHooksProxy, getOrInitializeCascadedLoader, register, diff --git a/lib/internal/modules/esm/worker.js b/lib/internal/modules/esm/worker.js index 311d77fb0993841..587ba6ccd8e0a31 100644 --- a/lib/internal/modules/esm/worker.js +++ b/lib/internal/modules/esm/worker.js @@ -87,7 +87,7 @@ function wrapMessage(status, body) { * @param {(err: Error, origin?: string) => void} errorHandler - The function to use for uncaught exceptions. * @returns {Promise} A promise that resolves when the worker thread has been initialized. */ -async function customizedModuleWorker(lock, syncCommPort, errorHandler) { +async function customizedModuleWorker(lock, syncCommPort, registrationPort, errorHandler) { let hooks; let initializationError; let hasInitializationError = false; @@ -97,7 +97,19 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { // so it can detect the exit event. const { exit } = process; process.exit = function(code) { + if (hooks) { + for (const registeredPort of allThreadRegisteredHandlerPorts) { + registeredPort.postMessage(wrapMessage('exit', code ?? process.exitCode)); + } + for (const { port, lock: operationLock } of unsettledResponsePorts) { + port.postMessage(wrapMessage('exit', code ?? process.exitCode)); + // wake all threads that have pending operations. Is that needed??? + AtomicsAdd(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); + AtomicsNotify(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION); + } + } syncCommPort.postMessage(wrapMessage('exit', code ?? process.exitCode)); + registrationPort.postMessage(wrapMessage('exit', code ?? process.exitCode)); AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); return ReflectApply(exit, this, arguments); @@ -116,6 +128,7 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { } syncCommPort.on('message', handleMessage); + registrationPort.on('message', handleMessage); if (hasInitializationError) { syncCommPort.postMessage(wrapMessage('error', initializationError)); @@ -145,8 +158,11 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { const unsettledResponsePorts = new SafeSet(); process.on('beforeExit', () => { - for (const port of unsettledResponsePorts) { + for (const { port, lock: operationLock } of unsettledResponsePorts) { port.postMessage(wrapMessage('never-settle')); + // wake all threads that have pending operations. Is that needed??? + AtomicsAdd(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); + AtomicsNotify(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION); } unsettledResponsePorts.clear(); @@ -164,6 +180,19 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { setImmediate(() => {}); }); + const allThreadRegisteredHandlerPorts = [] + function registerHandler(toWorkerThread) { + toWorkerThread.on('message', handleMessage); + allThreadRegisteredHandlerPorts.push(toWorkerThread); + } + + function getMessageHandler(method) { + if (method === '#registerWorkerClient') { + return registerHandler; + } + return hooks[method]; + } + /** * Handles incoming messages from the main thread or other workers. * @param {object} options - The options object. @@ -171,38 +200,41 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { * @param {Array} options.args - The arguments to pass to the method. * @param {MessagePort} options.port - The message port to use for communication. */ - async function handleMessage({ method, args, port }) { + async function handleMessage({ method, args, port, lock: msgLock }) { // Each potential exception needs to be caught individually so that the correct error is sent to // the main thread. let hasError = false; let shouldRemoveGlobalErrorHandler = false; - assert(typeof hooks[method] === 'function'); + assert(typeof getMessageHandler(method) === 'function'); if (port == null && !hasUncaughtExceptionCaptureCallback()) { // When receiving sync messages, we want to unlock the main thread when there's an exception. process.on('uncaughtException', errorHandler); shouldRemoveGlobalErrorHandler = true; } + const usedLock = msgLock ?? lock; // We are about to yield the execution with `await ReflectApply` below. In case the code // following the `await` never runs, we remove the message handler so the `beforeExit` event // can be triggered. syncCommPort.off('message', handleMessage); + registrationPort.off('message', handleMessage); // We keep checking for new messages to not miss any. clearImmediate(immediate); immediate = setImmediate(checkForMessages).unref(); - unsettledResponsePorts.add(port ?? syncCommPort); + const unsettledActionData = { port: port ?? syncCommPort, lock: usedLock }; + unsettledResponsePorts.add(unsettledActionData); let response; try { - response = await ReflectApply(hooks[method], hooks, args); + response = await ReflectApply(getMessageHandler(method), hooks, args); } catch (exception) { hasError = true; response = exception; } - unsettledResponsePorts.delete(port ?? syncCommPort); + unsettledResponsePorts.delete(unsettledActionData); // Send the method response (or exception) to the main thread. try { @@ -215,8 +247,8 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { (port ?? syncCommPort).postMessage(wrapMessage('error', exception)); } - AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); - AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); + AtomicsAdd(usedLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); + AtomicsNotify(usedLock, WORKER_TO_MAIN_THREAD_NOTIFICATION); if (shouldRemoveGlobalErrorHandler) { process.off('uncaughtException', errorHandler); } @@ -235,7 +267,8 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) { * @param {MessagePort} syncCommPort - The communication port used to communicate with the main thread. */ module.exports = function setupModuleWorker(workerData, syncCommPort) { - const lock = new Int32Array(workerData.lock); + const { registrationPort, lock: lockSM } = workerData + const lock = new Int32Array(lockSM); /** * Handles errors that occur in the worker thread. @@ -257,7 +290,7 @@ module.exports = function setupModuleWorker(workerData, syncCommPort) { } return PromisePrototypeThen( - customizedModuleWorker(lock, syncCommPort, errorHandler), + customizedModuleWorker(lock, syncCommPort, registrationPort, errorHandler), undefined, errorHandler, ); diff --git a/lib/internal/worker.js b/lib/internal/worker.js index b58cbe56d01703d..e1624e7f5a88f99 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -259,6 +259,12 @@ class Worker extends EventEmitter { ...new SafeArrayIterator(options.transferList)); this[kPublicPort] = port1; + const { port1: toWorkerThread, port2: toHooksThread } = new MessageChannel(); + if (!isInternal) { + // this is not an internal hooks thread => it needs a channel to the hooks thread: + // - send it one side of a channel here + transferList.push(toHooksThread); + } ArrayPrototypeForEach(['message', 'messageerror'], (event) => { this[kPublicPort].on(event, (message) => this.emit(event, message)); }); @@ -273,6 +279,7 @@ class Worker extends EventEmitter { workerData: options.workerData, environmentData, publicPort: port2, + hooksPort: !isInternal ? toHooksThread : undefined, manifestURL: getOptionValue('--experimental-policy') ? require('internal/process/policy').url : null, @@ -281,6 +288,14 @@ class Worker extends EventEmitter { null, hasStdin: !!options.stdin, }, transferList); + const loaderModule = require('internal/modules/esm/loader'); + const hasCustomizations = loaderModule.hasCustomizations(); + if (!isInternal && hasCustomizations) { + // - send the second side of the channel to the hooks thread + loaderModule.getHooksProxy().makeAsyncRequest( + '#registerWorkerClient', [toWorkerThread], toWorkerThread, this.threadId + ); + } // Use this to cache the Worker's loopStart value once available. this[kLoopStartTime] = -1; this[kIsOnline] = false; @@ -539,6 +554,7 @@ module.exports = { kIsOnline, isMainThread, SHARE_ENV, + hooksPort: undefined, resourceLimits: !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {}, setEnvironmentData, diff --git a/test/es-module/test-esm-loader-threads.mjs b/test/es-module/test-esm-loader-threads.mjs new file mode 100644 index 000000000000000..6e0f37e397de04d --- /dev/null +++ b/test/es-module/test-esm-loader-threads.mjs @@ -0,0 +1,40 @@ +import { spawnPromisified } from '../common/index.mjs'; +import * as fixtures from '../common/fixtures.mjs'; +import { strictEqual } from 'node:assert'; +import { execPath } from 'node:process'; +import { describe, it } from 'node:test'; + +describe('off-thread hooks', { concurrency: true }, () => { + it('uses only one hooks thread to support multiple application threads', async () => { + const { code, signal, stdout, stderr } = await spawnPromisified(execPath, [ + '--no-warnings', + '--import', + `data:text/javascript,${encodeURIComponent(` + import { register } from 'node:module'; + register(${JSON.stringify(fixtures.fileURL('es-module-loaders/hooks-log.mjs'))}); + `)}`, + fixtures.path('es-module-loaders/workers-spawned.mjs'), + ]); + + strictEqual(stderr, ''); + strictEqual(stdout.split('\n').filter(line => line.startsWith('initialize')).length, 1); + strictEqual(stdout.split('\n').filter(line => line === 'foo').length, 2); + strictEqual(stdout.split('\n').filter(line => line === 'bar').length, 4); + /*** + * resolve/load calls: + * 1x main script: test/fixtures/es-module-loaders/workers-spawned.mjs + * 3x worker_threads + * => 1x test/fixtures/es-module-loaders/worker-log.mjs + * 2x test/fixtures/es-module-loaders/worker-log-again.mjs => once per worker-log.mjs Worker instance + * 2x test/fixtures/es-module-loaders/worker-log.mjs => once per worker-log.mjs Worker instance + * 4x test/fixtures/es-module-loaders/worker-log-again.mjs => 2x for each worker-log + * 6x module-named-exports.mjs => 2x worker-log.mjs + 4x worker-log-again.mjs + * =========================== + * 16 calls to resolve + 16 calls to load hook for the registered custom loader + */ + strictEqual(stdout.split('\n').filter(line => line.startsWith('hooked resolve')).length, 16); + strictEqual(stdout.split('\n').filter(line => line.startsWith('hooked load')).length, 16); + strictEqual(code, 0); + strictEqual(signal, null); + }); +}); diff --git a/test/fixtures/es-module-loaders/hooks-log.mjs b/test/fixtures/es-module-loaders/hooks-log.mjs new file mode 100644 index 000000000000000..2d2512281e8bd54 --- /dev/null +++ b/test/fixtures/es-module-loaders/hooks-log.mjs @@ -0,0 +1,19 @@ +import { writeFileSync } from 'node:fs'; + +let initializeCount = 0; +let resolveCount = 0; +let loadCount = 0; + +export function initialize() { + writeFileSync(1, `initialize ${++initializeCount}\n`); +} + +export function resolve(specifier, context, next) { + writeFileSync(1, `hooked resolve ${++resolveCount} ${specifier}\n`); + return next(specifier, context); +} + +export function load(url, context, next) { + writeFileSync(1, `hooked load ${++loadCount} ${url}\n`); + return next(url, context); +} diff --git a/test/fixtures/es-module-loaders/worker-log-again.mjs b/test/fixtures/es-module-loaders/worker-log-again.mjs new file mode 100644 index 000000000000000..2969edc8dac382d --- /dev/null +++ b/test/fixtures/es-module-loaders/worker-log-again.mjs @@ -0,0 +1,3 @@ +import { bar } from './module-named-exports.mjs'; + +console.log(bar); diff --git a/test/fixtures/es-module-loaders/worker-log.mjs b/test/fixtures/es-module-loaders/worker-log.mjs new file mode 100644 index 000000000000000..13290c37d07104b --- /dev/null +++ b/test/fixtures/es-module-loaders/worker-log.mjs @@ -0,0 +1,9 @@ +import { Worker } from 'worker_threads'; +import { foo } from './module-named-exports.mjs'; + +const workerURL = new URL('./worker-log-again.mjs', import.meta.url); +console.log(foo); + +// Spawn two workers +new Worker(workerURL); +new Worker(workerURL); diff --git a/test/fixtures/es-module-loaders/workers-spawned.mjs b/test/fixtures/es-module-loaders/workers-spawned.mjs new file mode 100644 index 000000000000000..439847656fe13e4 --- /dev/null +++ b/test/fixtures/es-module-loaders/workers-spawned.mjs @@ -0,0 +1,7 @@ +import { Worker } from 'worker_threads'; + +const workerURL = new URL('./worker-log.mjs', import.meta.url); + +// Spawn two workers +new Worker(workerURL); +new Worker(workerURL);