Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Move ESM loaders to worker thread #31229

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions lib/internal/bootstrap/pre_execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ function prepareMainThreadExecution(expandArgv1 = false) {


setupDebugEnv();
// Load policy from disk and parse it.
initializePolicy();

// Print stack trace on `SIGINT` if option `--trace-sigint` presents.
setupStacktracePrinterOnSigint();
Expand All @@ -45,9 +47,6 @@ function prepareMainThreadExecution(expandArgv1 = false) {
// process.disconnect().
setupChildProcessIpcChannel();

// Load policy from disk and parse it.
initializePolicy();

// If this is a worker in cluster mode, start up the communication
// channel. This needs to be done before any user code gets executed
// (including preload modules).
Expand All @@ -56,7 +55,32 @@ function prepareMainThreadExecution(expandArgv1 = false) {
initializeDeprecations();
initializeWASI();
initializeCJSLoader();
initializeESMLoader();

function startLoaders() {
const loaderHREF = getOptionValue('--experimental-loader');
if (!loaderHREF) return null;
const { InternalWorker } = require('internal/worker');
const { MessageChannel } = require('internal/worker/io');
// DO NOT ADD CODE ABOVE THIS LINE
// THIS SHOULD POST THE PORTS ASAP TO THE PARENT
const {
port1: outsideBelowPort,
port2: insideBelowPort
} = new MessageChannel();
outsideBelowPort.name = 'outsideBelowPort';
InternalWorker('internal/modules/esm/worker', {
// stdout: true,
// strerr: true,
transferList: [ insideBelowPort ],
workerData: {
loaderHREF,
insideBelowPort,
insideAbovePort: null
}
}).unref();
return outsideBelowPort;
}
initializeESMLoader(startLoaders());

const CJSLoader = require('internal/modules/cjs/loader');
assert(!CJSLoader.hasLoadedAnyUserCJSModule);
Expand Down Expand Up @@ -332,12 +356,12 @@ function initializeClusterIPC() {
}
}

const { pathToFileURL, URL } = require('url');
function initializePolicy() {
const experimentalPolicy = getOptionValue('--experimental-policy');
if (experimentalPolicy) {
process.emitWarning('Policies are experimental.',
'ExperimentalWarning');
const { pathToFileURL, URL } = require('url');
// URL here as it is slightly different parsing
// no bare specifiers for now
let manifestURL;
Expand Down Expand Up @@ -396,14 +420,16 @@ function initializeCJSLoader() {
require('internal/modules/run_main').executeUserEntryPoint;
}

function initializeESMLoader() {
function initializeESMLoader(bottomLoader) {
// Create this WeakMap in js-land because V8 has no C++ API for WeakMap.
internalBinding('module_wrap').callbackMap = new SafeWeakMap();

const {
setImportModuleDynamicallyCallback,
setInitializeImportMetaObjectCallback
} = internalBinding('module_wrap');
const esmLoader = require('internal/modules/esm/loader');
esmLoader.initUserLoaders(bottomLoader);
const esm = require('internal/process/esm_loader');
// Setup per-isolate callbacks that locate data or callbacks that we keep
// track of for different ESM modules.
Expand Down
25 changes: 16 additions & 9 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ port.on('message', (message) => {
cwdCounter,
filename,
doEval,
internal,
workerData,
loaderPort,
publicPort,
manifestSrc,
manifestURL,
Expand All @@ -113,14 +115,16 @@ port.on('message', (message) => {
initializeDeprecations();
initializeWASI();
initializeCJSLoader();
initializeESMLoader();

const CJSLoader = require('internal/modules/cjs/loader');
assert(!CJSLoader.hasLoadedAnyUserCJSModule);
loadPreloadModules();
initializeFrozenIntrinsics();
if (argv !== undefined) {
process.argv = process.argv.concat(argv);
initializeESMLoader(loaderPort);

if (!internal) {
const CJSLoader = require('internal/modules/cjs/loader');
assert(!CJSLoader.hasLoadedAnyUserCJSModule);
loadPreloadModules();
initializeFrozenIntrinsics();
if (argv !== undefined) {
process.argv = process.argv.concat(argv);
}
}
publicWorker.parentPort = publicPort;
publicWorker.workerData = workerData;
Expand All @@ -147,7 +151,9 @@ port.on('message', (message) => {
debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.postMessage({ type: UP_AND_RUNNING });
if (doEval) {
if (internal) {
require(filename);
} else if (doEval) {
const { evalScript } = require('internal/process/execution');
const name = '[worker eval]';
// This is necessary for CJS module compilation.
Expand All @@ -164,6 +170,7 @@ port.on('message', (message) => {
// runMain here might be monkey-patched by users in --require.
// XXX: the monkey-patchability here should probably be deprecated.
process.argv.splice(1, 0, filename);
const CJSLoader = require('internal/modules/cjs/loader');
CJSLoader.Module.runMain(filename);
}
} else if (message.type === STDIO_PAYLOAD) {
Expand Down
101 changes: 101 additions & 0 deletions lib/internal/modules/esm/ipc_types.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/* eslint-disable */
let nextId = 1;
function getNewId() {
const id = nextId;
nextId++;
return id;
}

class LoaderWorker {
resolveImportURL() {
throw new TypeError('Not implemented');
}

getFormat() {
throw new TypeError('Not implemented');
}

getSource() {
throw new TypeError('Not implemented');
}

transformSource() {
throw new TypeError('Not implemented');
}

addBelowPort() {
throw new TypeError('Not implemented');
}
}

class RemoteLoaderWorker {
constructor(port) {
this._port = port;
this._pending = new Map();

this._port.on('message', this._handleMessage.bind(this));
this._port.unref();
}

_handleMessage({ id, error, result }) {
if (!this._pending.has(id)) return;
const { resolve, reject } = this._pending.get(id);
this._pending.delete(id);
if (!this._pending.size) {
this._port.unref();
}

if (error) {
reject(error);
} else {
resolve(result);
}
}

_send(method, params, transferList) {
const id = getNewId();
const message = { id, method, params };
return new Promise((resolve, reject) => {
this._pending.set(id, { resolve, reject });
this._port.postMessage(message, transferList);
this._port.ref();
});
}
}

for (const method of Object.getOwnPropertyNames(LoaderWorker.prototype)) {
Object.defineProperty(RemoteLoaderWorker.prototype, method, {
configurable: true,
enumerable: false,
value(data, transferList) {
return this._send(method, data, transferList);
},
});
}

function connectIncoming(port, instance) {
port.on('message', async ({ id, method, params }) => {
if (!id) return;

let result = null;
let error = null;
try {
if (typeof instance[method] !== 'function') {
throw new TypeError(`No such RPC method: ${method}`);
}
result = await instance[method](params);
} catch (e) {
error = e;
}
port.postMessage({ id, error, result });
});

return instance;
}

module.exports = {
connectIncoming,
getNewId,
LoaderWorker,
RemoteLoaderWorker,
};
Loading