-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spawn a new worker when an old one is stopped #62
Spawn a new worker when an old one is stopped #62
Conversation
await wait(200); | ||
|
||
// then | ||
t.is(workerNodes.workersQueue.storage.length, 0); | ||
t.is(workerNodes.workersQueue.storage.filter(worker => worker.id === executingWorkerId).length, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion wasn't entirely accurate IMO, since
should kill the worker that was involved in processing the task
Previous assertion checks that pool is empty, while the correct assertion is that the worker in question was replaced.
hi @yurynix, LGTM! will be happy to release next version - but please check out failing tests ;) |
Thank you for taking the time to review this @bgalek 🙏 The tests were passing on my machine, so I think the problem is on CI it takes more time to spawn the workers back, I've added a utility function |
It seems there's an issue indeed with node@11 (can reproduce locally), the tests take much longer to execute than on newer node versions. |
On my machine (arm64 macOS 13.4 (22F66)) it reproduces:
And on node@11:
|
@yurynix I've updated supported versions to incldue most popular ones, rebase your branch with master ;) |
@bgalek Sure thing 🙏 , nevertheless I want to understand what happened with node@11, there might be an actual issue it exposed. |
@yurynix totally agree - we should get to the bottom of this to know what was the cause ;) |
Ok, so it seems that on all versions before node v12.6.3, the code change proposed in this PR will fail. I'm unsure which one of those actually fixed the issue:
This is happens because There are 2 solutions that can work: Solution1: diff --git a/lib/pool.js b/lib/pool.js
index c75bbf5..575add6 100644
--- a/lib/pool.js
+++ b/lib/pool.js
@@ -120,7 +120,11 @@ class WorkerNodes extends EventEmitter {
this.workersQueue.remove(worker);
- if (this.canStartWorker()) this.startWorker();
+ if (!worker.isFailedToInit()) {
+ if (this.canStartWorker()) this.startWorker();
+ } else {
+ console.warn(`Worker exited before finished initialization!`);
+ }
this.processQueue();
}
diff --git a/lib/worker.js b/lib/worker.js
index 9512892..c0be0a3 100644
--- a/lib/worker.js
+++ b/lib/worker.js
@@ -20,6 +20,7 @@ class Worker extends EventEmitter {
this.endurance = endurance;
this.isTerminating = false;
this.isProcessAlive = false;
+ this.failedToInit = false;
const process = this.process = new WorkerProcess(srcFilePath, { stopTimeout, asyncWorkerInitialization, resourceLimits });
@@ -35,6 +36,7 @@ class Worker extends EventEmitter {
process.once('exit', code => {
this.exitCode = code;
+ this.failedToInit = !this.isProcessAlive;
this.isProcessAlive = false;
this.emit('exit', code);
});
@@ -91,6 +93,14 @@ class Worker extends EventEmitter {
return this.activeCalls > 0;
}
+ /**
+ * Worker process exited before initialization
+ * @returns {boolean}
+ */
+ isFailedToInit() {
+ return this.failedToInit;
+ }
+
/**
*
* @returns {boolean} Solution2 (which i commited): index c75bbf5..5d14c5e 100644
--- a/lib/pool.js
+++ b/lib/pool.js
@@ -120,7 +120,9 @@ class WorkerNodes extends EventEmitter {
this.workersQueue.remove(worker);
- if (this.canStartWorker()) this.startWorker();
+ setImmediate(() => {
+ if (this.canStartWorker()) this.startWorker();
+ });
this.processQueue();
} |
nice! setImmediate() seems less disrupting |
3a0a6fd
to
9cc42ed
Compare
@yurynix it's still not working, I'm ok with dropping node12 support, LTS in on 18 right now |
@bgalek it's now not working because it seems
It started to break in this commit IMO... I'm fine with dropping node 12, do you want me to change it in this PR? |
Yes please, you can już remove v12 from matrix :) |
@yurynix thank you for your contribution! |
@yurynix v2.5.0 released! |
Thank you for the help on this one @bgalek 🙏 |
Hi 👋
Thank you for your work in this great library 🙏
In my scenario, I run a task in a process that contaminates global process space, I would like each task to have a separate process (
workerEndurance = 1
).When worker is exhausted at the moment, no other worker replaces it if the queue is empty, that makes the worker pool to go to zero and then start spawning workers only when new tasks arrive, that's not so ideal IMO.
This PR changes the behavior in such a way that for each exited worker, if we can accommodate it, we spawn a new one immediately.
Have a great day!