Skip to content

Commit

Permalink
worker: move worker thread setup code into the main script
Browse files Browse the repository at this point in the history
This patch directly inlines `createMessageHandler()` and
`createWorkerFatalExeception()` in the new
`lib/internal/main/worker_thread.js` since the implementation
of the two methods are related to the execution flow of
workers.

PR-URL: #25667
Reviewed-By: Anna Henningsen <[email protected]>
Backport-PR-URL: #26036
  • Loading branch information
joyeecheung authored and targos committed Feb 10, 2019
1 parent e6a4fb6 commit 8db6b8a
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 120 deletions.
116 changes: 102 additions & 14 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,118 @@ const {
} = require('internal/bootstrap/pre_execution');

const {
getEnvMessagePort,
threadId
threadId,
getEnvMessagePort
} = internalBinding('worker');

const {
createMessageHandler,
createWorkerFatalExeception
} = require('internal/process/worker_thread_only');
messageTypes: {
// Messages that may be received by workers
LOAD_SCRIPT,
// Messages that may be posted from workers
UP_AND_RUNNING,
ERROR_MESSAGE,
COULD_NOT_SERIALIZE_ERROR,
// Messages that may be either received or posted
STDIO_PAYLOAD,
STDIO_WANTS_MORE_DATA,
},
kStdioWantsMoreDataCallback
} = require('internal/worker/io');

const {
fatalException: originalFatalException
} = require('internal/process/execution');

const publicWorker = require('worker_threads');
const debug = require('util').debuglog('worker');
debug(`[${threadId}] is setting up worker child environment`);

function prepareUserCodeExecution() {
initializeClusterIPC();
initializeESMLoader();
loadPreloadModules();
}
debug(`[${threadId}] is setting up worker child environment`);

// Set up the message port and start listening
const port = getEnvMessagePort();
port.on('message', createMessageHandler(port, prepareUserCodeExecution));
port.start();

port.on('message', (message) => {
if (message.type === LOAD_SCRIPT) {
const {
filename,
doEval,
workerData,
publicPort,
manifestSrc,
manifestURL,
hasStdin
} = message;
if (manifestSrc) {
require('internal/process/policy').setup(manifestSrc, manifestURL);
}
initializeClusterIPC();
initializeESMLoader();
loadPreloadModules();
publicWorker.parentPort = publicPort;
publicWorker.workerData = workerData;

if (!hasStdin)
process.stdin.push(null);

debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.unref();
port.postMessage({ type: UP_AND_RUNNING });
if (doEval) {
const { evalScript } = require('internal/process/execution');
evalScript('[worker eval]', filename);
} else {
process.argv[1] = filename; // script filename
require('module').runMain();
}
return;
} else if (message.type === STDIO_PAYLOAD) {
const { stream, chunk, encoding } = message;
process[stream].push(chunk, encoding);
return;
} else if (message.type === STDIO_WANTS_MORE_DATA) {
const { stream } = message;
process[stream][kStdioWantsMoreDataCallback]();
return;
}

require('assert').fail(`Unknown worker message type ${message.type}`);
});

// Overwrite fatalException
process._fatalException = createWorkerFatalExeception(port);
process._fatalException = (error) => {
debug(`[${threadId}] gets fatal exception`);
let caught = false;
try {
caught = originalFatalException.call(this, error);
} catch (e) {
error = e;
}
debug(`[${threadId}] fatal exception caught = ${caught}`);

if (!caught) {
let serialized;
try {
const { serializeError } = require('internal/error-serdes');
serialized = serializeError(error);
} catch {}
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
if (serialized)
port.postMessage({
type: ERROR_MESSAGE,
error: serialized
});
else
port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR });

const { clearAsyncIdStack } = require('internal/async_hooks');
clearAsyncIdStack();

process.exit();
}
};

markBootstrapComplete();

port.start();
107 changes: 1 addition & 106 deletions lib/internal/process/worker_thread_only.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
// This file contains process bootstrappers that can only be
// run in the worker thread.
const {
getEnvMessagePort,
threadId
getEnvMessagePort
} = internalBinding('worker');

const {
messageTypes,
kStdioWantsMoreDataCallback,
kWaitingStreams,
ReadableWorkerStdio,
WritableWorkerStdio
Expand All @@ -18,15 +15,6 @@ const {
const {
codes: { ERR_WORKER_UNSUPPORTED_OPERATION }
} = require('internal/errors');

let debuglog;
function debug(...args) {
if (!debuglog) {
debuglog = require('util').debuglog('worker');
}
return debuglog(...args);
}

const workerStdio = {};

function initializeWorkerStdio() {
Expand All @@ -43,97 +31,6 @@ function initializeWorkerStdio() {
};
}

function createMessageHandler(port, prepareUserCodeExecution) {
const publicWorker = require('worker_threads');

return function(message) {
if (message.type === messageTypes.LOAD_SCRIPT) {
const {
filename,
doEval,
workerData,
publicPort,
manifestSrc,
manifestURL,
hasStdin
} = message;
if (manifestSrc) {
require('internal/process/policy').setup(manifestSrc, manifestURL);
}
prepareUserCodeExecution();
publicWorker.parentPort = publicPort;
publicWorker.workerData = workerData;

if (!hasStdin)
workerStdio.stdin.push(null);

debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${eval}) at cwd = ${process.cwd()}`);
port.unref();
port.postMessage({ type: messageTypes.UP_AND_RUNNING });
if (doEval) {
const { evalScript } = require('internal/process/execution');
evalScript('[worker eval]', filename);
} else {
process.argv[1] = filename; // script filename
require('module').runMain();
}
return;
} else if (message.type === messageTypes.STDIO_PAYLOAD) {
const { stream, chunk, encoding } = message;
workerStdio[stream].push(chunk, encoding);
return;
} else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
const { stream } = message;
workerStdio[stream][kStdioWantsMoreDataCallback]();
return;
}

require('assert').fail(`Unknown worker message type ${message.type}`);
};
}

// XXX(joyeecheung): this has to be returned as an anonymous function
// wrapped in a closure, see the comment of the original
// process._fatalException in lib/internal/process/execution.js
function createWorkerFatalExeception(port) {
const {
fatalException: originalFatalException
} = require('internal/process/execution');

return (error) => {
debug(`[${threadId}] gets fatal exception`);
let caught = false;
try {
caught = originalFatalException.call(this, error);
} catch (e) {
error = e;
}
debug(`[${threadId}] fatal exception caught = ${caught}`);

if (!caught) {
let serialized;
try {
const { serializeError } = require('internal/error-serdes');
serialized = serializeError(error);
} catch {}
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
if (serialized)
port.postMessage({
type: messageTypes.ERROR_MESSAGE,
error: serialized
});
else
port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });

const { clearAsyncIdStack } = require('internal/async_hooks');
clearAsyncIdStack();

process.exit();
}
};
}

// The execution of this function itself should not cause any side effects.
function wrapProcessMethods(binding) {
function umask(mask) {
Expand All @@ -150,7 +47,5 @@ function wrapProcessMethods(binding) {

module.exports = {
initializeWorkerStdio,
createMessageHandler,
createWorkerFatalExeception,
wrapProcessMethods
};

0 comments on commit 8db6b8a

Please sign in to comment.