Skip to content

Commit

Permalink
module: move esm loader hooks to worker thread
Browse files Browse the repository at this point in the history
  • Loading branch information
bfarias-godaddy authored and jkrems committed May 28, 2020
1 parent d12d5ef commit c4f2cf9
Show file tree
Hide file tree
Showing 21 changed files with 574 additions and 156 deletions.
38 changes: 32 additions & 6 deletions lib/internal/bootstrap/pre_execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ function prepareMainThreadExecution(expandArgv1 = false) {


setupDebugEnv();
// Load policy from disk and parse it.
initializePolicy();

// Print stack trace on `SIGINT` if option `--trace-sigint` presents.
setupStacktracePrinterOnSigint();
Expand All @@ -45,9 +47,6 @@ function prepareMainThreadExecution(expandArgv1 = false) {
// process.disconnect().
setupChildProcessIpcChannel();

// Load policy from disk and parse it.
initializePolicy();

// If this is a worker in cluster mode, start up the communication
// channel. This needs to be done before any user code gets executed
// (including preload modules).
Expand All @@ -56,7 +55,32 @@ function prepareMainThreadExecution(expandArgv1 = false) {
initializeDeprecations();
initializeWASI();
initializeCJSLoader();
initializeESMLoader();

function startLoaders() {
const loaderHREF = getOptionValue('--experimental-loader');
if (!loaderHREF) return null;
const { InternalWorker } = require('internal/worker');
const { MessageChannel } = require('internal/worker/io');
// DO NOT ADD CODE ABOVE THIS LINE
// THIS SHOULD POST THE PORTS ASAP TO THE PARENT
const {
port1: outsideBelowPort,
port2: insideBelowPort
} = new MessageChannel();
outsideBelowPort.name = 'outsideBelowPort';
InternalWorker('internal/modules/esm/worker', {
// stdout: true,
// strerr: true,
transferList: [ insideBelowPort ],
workerData: {
loaderHREF,
insideBelowPort,
insideAbovePort: null
}
}).unref();
return outsideBelowPort;
}
initializeESMLoader(startLoaders());

const CJSLoader = require('internal/modules/cjs/loader');
assert(!CJSLoader.hasLoadedAnyUserCJSModule);
Expand Down Expand Up @@ -332,12 +356,12 @@ function initializeClusterIPC() {
}
}

const { pathToFileURL, URL } = require('url');
function initializePolicy() {
const experimentalPolicy = getOptionValue('--experimental-policy');
if (experimentalPolicy) {
process.emitWarning('Policies are experimental.',
'ExperimentalWarning');
const { pathToFileURL, URL } = require('url');
// URL here as it is slightly different parsing
// no bare specifiers for now
let manifestURL;
Expand Down Expand Up @@ -396,14 +420,16 @@ function initializeCJSLoader() {
require('internal/modules/run_main').executeUserEntryPoint;
}

function initializeESMLoader() {
function initializeESMLoader(bottomLoader) {
// Create this WeakMap in js-land because V8 has no C++ API for WeakMap.
internalBinding('module_wrap').callbackMap = new SafeWeakMap();

const {
setImportModuleDynamicallyCallback,
setInitializeImportMetaObjectCallback
} = internalBinding('module_wrap');
const esmLoader = require('internal/modules/esm/loader');
esmLoader.initUserLoaders(bottomLoader);
const esm = require('internal/process/esm_loader');
// Setup per-isolate callbacks that locate data or callbacks that we keep
// track of for different ESM modules.
Expand Down
25 changes: 16 additions & 9 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ port.on('message', (message) => {
cwdCounter,
filename,
doEval,
internal,
workerData,
loaderPort,
publicPort,
manifestSrc,
manifestURL,
Expand All @@ -113,14 +115,16 @@ port.on('message', (message) => {
initializeDeprecations();
initializeWASI();
initializeCJSLoader();
initializeESMLoader();

const CJSLoader = require('internal/modules/cjs/loader');
assert(!CJSLoader.hasLoadedAnyUserCJSModule);
loadPreloadModules();
initializeFrozenIntrinsics();
if (argv !== undefined) {
process.argv = process.argv.concat(argv);
initializeESMLoader(loaderPort);

if (!internal) {
const CJSLoader = require('internal/modules/cjs/loader');
assert(!CJSLoader.hasLoadedAnyUserCJSModule);
loadPreloadModules();
initializeFrozenIntrinsics();
if (argv !== undefined) {
process.argv = process.argv.concat(argv);
}
}
publicWorker.parentPort = publicPort;
publicWorker.workerData = workerData;
Expand All @@ -147,7 +151,9 @@ port.on('message', (message) => {
debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.postMessage({ type: UP_AND_RUNNING });
if (doEval) {
if (internal) {
require(filename);
} else if (doEval) {
const { evalScript } = require('internal/process/execution');
const name = '[worker eval]';
// This is necessary for CJS module compilation.
Expand All @@ -164,6 +170,7 @@ port.on('message', (message) => {
// runMain here might be monkey-patched by users in --require.
// XXX: the monkey-patchability here should probably be deprecated.
process.argv.splice(1, 0, filename);
const CJSLoader = require('internal/modules/cjs/loader');
CJSLoader.Module.runMain(filename);
}
} else if (message.type === STDIO_PAYLOAD) {
Expand Down
99 changes: 99 additions & 0 deletions lib/internal/modules/esm/ipc_types.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/* eslint-disable */
let nextId = 1;
function getNewId() {
const id = nextId;
nextId++;
return id;
}

class LoaderWorker {
resolveImportURL() {
throw new TypeError('Not implemented');
}

getFormat() {
throw new TypeError('Not implemented');
}

getSource() {
throw new TypeError('Not implemented');
}

transformSource() {
throw new TypeError('Not implemented');
}

addBelowPort() {
throw new TypeError('Not implemented');
}
}

class RemoteLoaderWorker {
constructor(port) {
this._port = port;
this._pending = new Map();

this._port.on('message', this._handleMessage.bind(this));
this._port.unref();
}

_handleMessage({ id, error, result }) {
if (!this._pending.has(id)) return;
const { resolve, reject } = this._pending.get(id);
this._pending.delete(id);
this._port.unref();

if (error) {
reject(error);
} else {
resolve(result);
}
}

_send(method, params, transferList) {
const id = getNewId();
const message = { id, method, params };
return new Promise((resolve, reject) => {
this._pending.set(id, { resolve, reject });
this._port.postMessage(message, transferList);
this._port.ref();
});
}
}

for (const method of Object.getOwnPropertyNames(LoaderWorker.prototype)) {
Object.defineProperty(RemoteLoaderWorker.prototype, method, {
configurable: true,
enumerable: false,
value(data, transferList) {
return this._send(method, data, transferList);
},
});
}

function connectIncoming(port, instance) {
port.on('message', async ({ id, method, params }) => {
if (!id) return;

let result = null;
let error = null;
try {
if (typeof instance[method] !== 'function') {
throw new TypeError(`No such RPC method: ${method}`);
}
result = await instance[method](params);
} catch (e) {
error = e;
}
port.postMessage({ id, error, result });
});

return instance;
}

module.exports = {
connectIncoming,
getNewId,
LoaderWorker,
RemoteLoaderWorker,
};
Loading

0 comments on commit c4f2cf9

Please sign in to comment.