-
Notifications
You must be signed in to change notification settings - Fork 146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: Add abort signal support for worker execution #441
Conversation
Thanks for your PR Josh, it looks well taken care of 👌 . Can you explain what the use cases are for this new abort signal API? I suppose it is to for example cancel running fetch actions, or close a DB connection? I try to understand what it's capabilities and limitations are Like: I suppose it can be used in both browser and node.js? And I suppose it will not work when the task is running a synchronous operation, only async? Before this feature is ready to use it indeed has to work with both the A real world example would help me understand how this is to be used, can you give an example? On a side note: I expect to reply only after the weekend, I'm away for a couple of days. |
Sure! The main use cases assuming the current implementation is to allow for cleanup in the case of an async operations which have failure cases and need some clean which might also need top level const workerpool = require('workerpool');
function queryRunner(signal, queryString) {
const db = new DbController({}); // some database client;
await db.connect();
const query = db.query(queryString);
signal.addEventListener('abort', async () => {
query.cancel(); // cancel the query operatio
});
// Some call to start the query and block until resolution
// If this promise rejects the above abort listener will habdle the clean up
return query.execute();
}
workerpool.addWorker({
queryRunner: queryRunner
}); There is also some benefit with the current implementation if considering const workerpool = require('workerpool');
async function setTimeout(signal, ms) {
return await new Promise((resolve) => {
const timer = setTimeout(() => {
resolve();
}, ms);
signal.addEventListener('abort', () => {
clearTimeout(timer);
});
// some implementation which might cause a promise reject.
});
}
workerpool.addWorker({
setTimeout: setTimeout
});
The majority of the value comes from integration with |
Thanks for your explanation, I'll have a look at it asap. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This usecase definitely makes sense, thanks for the explanation.
I made a few inline comments, but most importantly I would like to think through the API: how to pass the signal
to the function?
It feels a bit weird to prepend the arguments with an extra argument signal
. How about attaching the signal to the method, so you can use it inside the function like this.signal.addEventListener(...)
? Or introduce a global worker.onAbort
, or so?
I would like to see if we can get rid of the need for useAbortSignal
. Suppose that we go for using this.signal
or a global worker.onAbort
, then there is no need for the useAbortSignal
. The workerpool can automatically create an AbortController and attach it to every registered method. Only thing is that we need to make sure the added listeners are removed again at the end of the method call. If that's not possible we can probably create a new AbortController right before every call, and ideally create it lazily. We should look into any performance impact then though. We could also think through whether we want to use AbortController
or instead create a custom callback like this.onAbort
or a global worker.onAbort
. In that case we can optimize it ourselves, like: create it only once, keep a list with callbacks in an array, and simply clear this list right before every method invocation. What do you think?
test/Pool.test.js
Outdated
@@ -475,6 +476,26 @@ describe('Pool', function () { | |||
}); | |||
}); | |||
|
|||
it('should respect signal and emit event from worker', function (done) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the abort signal working right now for rejections, timeout
and cancel
cases? Can you add a unit test for each of these three cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this time the pr does not have support for trigger the abort
when timeout
or cancel
is invoked. I am close to having this part complete but have an edge case I am working.
I think this make sense, I was originally going to implement this by injecting the const workerpool = require('workerpool');
async function setTimeout( ms) {
return await new Promise((resolve) => {
const timer = setTimeout(() => {
resolve();
}, ms);
this.signal.addEventListener('abort', () => {
clearTimeout(timer);
});
// some implementation which might cause a promise reject.
});
}
workerpool.addWorker({
setTimeout: setTimeout
}); The reason I ended up adding it as an option was to make it more obvious that the signals are now in scope. However, I now agree that it's a better pattern as modifying the function signature from what the end user has defined is not ideal. And it would be better to simply inject the |
I've updated the pr to no longer pass the I realized I was not using the |
The downside of the current implementation is that if functions are not defined in a script context which is loaded into the worker on time of creation, we are not currently able to inject a I think we can weigh if having signaling support when workers are executed through the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, I like where this is heading!
The downside of the current implementation is that if functions are not defined in a script context which is loaded into the worker on time of creation
I'm not entirely sure if I understand what you mean. Do you mean that the signals do not work for "offloaded functions"?
src/worker.js
Outdated
worker.send({ | ||
id: request.id, | ||
result: null, | ||
aborted: controller ? true : false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason for adding aborted: boolean
to the message with the result? I think you can already know that because error !== null
then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The boolean is for the case of "offloaded functions" since they might result in an error !== null
but do not have a controller associated with the execution. So this boolean allows us to be sure that the event handler should be invoked.
src/worker.js
Outdated
@@ -140,14 +141,19 @@ worker.on('message', function (request) { | |||
if (request === TERMINATE_METHOD_ID) { | |||
return worker.cleanupAndExit(0); | |||
} | |||
|
|||
const controller = request.method != 'methods' && request.method != 'run' ? new AbortController() : undefined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the code creates a new AbortController
for every method execution, right? I would like to know whether this has any performance impact. If so, we need to come up with something smart (like lazy instantiation or so).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there will be much performance hit since we are only checking on properties within an object. but I can run some benchmarks to see if there are any performance tax.
test/workers/abort.js
Outdated
const me = this; | ||
return new Promise(function (_resolve, reject) { | ||
me.signal.onabort = () => { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we know for sure that this callback is triggered during the unit test? Shouldn't it call something or log something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll update it to tick up a counter to assert that the event handler is invoked and assert on the value. Such that the test can fail if the event handler isn't triggered.
I think so yes, below is the implementation of the /**
* Execute a function with provided arguments
* @param {String} fn Stringified function
* @param {Array} [args] Function arguments
* @returns {*}
*/
worker.methods.run = function run(fn, args) {
var f = new Function('return (' + fn + ').apply(null, arguments);');
return f.apply(f, args);
}; The above implementation encapsulates the execution of an |
Thanks for the updates! Some thoughts: the old behavior was that when a running task is cancelled or times out, the worker running that task is terminated. The new behavior is that before termination, an abort event is delivered to the worker so it can neatly close database connections or open requests for example. With this in mind:
Some thoughts about the implementation:
|
@joshLong145 I see you commented yesterday but removed your comment again. Any thoughts on this? |
I took your advice and removed all my logic for managing controllers and went with a single controller being invoked from a message sent from the handlers processing of the timeout or cancel exception. |
I was looking into this in more depth, and noticed that the Here a working example demonstrating how to close a db connection on termination (after a cancel or timeout): // main.js
const workerpool = require("workerpool");
const pool = workerpool.pool(__dirname + "/workers/cancelWorker.js", {
workerType: "thread"
})
const main = async () => {
await pool
.exec("runDbQuery", [2, 3])
// .timeout(2000) // the timeout will trigger a termination, which neatly closes the connection beforehand
.then((result) => console.log('runDbQuery result', result))
.catch((err) => console.error(err))
await pool.terminate()
};
main(); With the following worker: // cancelWorker.js
const workerpool = require("workerpool");
// note: we could also keep an array with multiple connections,
// but we know that a worker only executes one function at a time,
// so there is no need for that.
let dbConnection = null
async function openConnection() {
// we mimic an open db connection by a simple number
const newConnection = Math.round(Math.random() * 1000)
console.log(`Opening db connection ${newConnection}`)
return newConnection
}
async function closeConnection(dbConnection) {
console.log(`Closing db connection ${dbConnection}...`)
await sleep(500)
console.log(`db connection ${dbConnection} closed`)
clearInterval(dbConnection)
dbConnection = null
}
async function runDbQuery(a, b) {
// register the connection in a global variable
dbConnection = await openConnection()
// pretend to run a query
console.log(`Executing query via db connection ${dbConnection}...`)
await sleep(5000)
const result = a + b
console.log(`Query results are in: ${result}`)
await closeConnection(dbConnection)
return result
}
function sleep(delay) {
return new Promise(resolve => setTimeout(resolve, delay))
}
// create a worker and register public functions
workerpool.worker(
{
runDbQuery
},
{
onTerminate: function (code) {
if (dbConnection) {
return closeConnection(dbConnection)
}
},
}
) Now, we could think through adding some "eye candy" on top of async function runDbQuery(a, b) {
// register the connection in a global variable
dbConnection = await openConnection()
// register the closeConnection function via worker.onAbort,
// so it will be invoked when the worker terminates after a cancel or timeout
worker.onAbort = () => closeConnection(dbConnection)
// pretend to run a query
console.log(`Executing query via db connection ${dbConnection}...`)
await sleep(5000)
const result = a + b
console.log(`Query results are in: ${result}`)
await closeConnection(dbConnection)
return result
} To implement this, I think all we have to do is add a section to |
Thanks for the reply. I see your perspective with Also, with the example you add for how async function runDbQuery(a, b) {
// register the connection in a global variable
dbConnection = await openConnection()
// register the closeConnection function via worker.onAbort,
// so it will be invoked when the worker terminates after a cancel or timeout
worker.onAbort = () => closeConnection(dbConnection)
// pretend to run a query
console.log(`Executing query via db connection ${dbConnection}...`)
await sleep(5000)
const result = a + b
console.log(`Query results are in: ${result}`)
await closeConnection(dbConnection)
return result
} In the above
So I'm unsure how a |
Sorry about the confusion about I think an // cancelWorker.js
const workerpool = require("workerpool");
// note: we could also keep an array with multiple controllers,
// but we know that a worker only executes one function at a time,
// so there is no need for that.
let myAbortController = null
async function runFetch(url) {
myAbortController = new AbortController()
// You can use myAbortController.signal.addEventListener if there is
// more stuff to be taken care of when `fetch` triggers the abort action
return fetch(url, { signal: myAbortController.signal })
}
// create a worker and register public functions
workerpool.worker(
{
runFetch
},
{
onTerminate: function (code) {
if (myAbortController) {
myAbortController.abort()
}
}
}
) I may be overlooking something, if that is the case, can you share an example of it to explain? |
Ah thank you for the clarification on I Think we have a good path forward with
|
Sounds good! I think the Do you have thoughts on how to best expose the |
|
c09b21b
to
cff840f
Compare
Has been implemented in PR #448 |
Adds support for
Abort Signals
through instances ofAbortController
for each task execution context. Functionality is enabled through theuseAbortSignal
flag on theexecOptions
object provided to theWorker
exec
method.Example:
Next Steps
To further the value of abort signaling there should be functionality added to Promise.timeout wrapper created during time of task execution to allow for further bridging to the given worker instance which is executing the task to explicitly call
abort
on thesignal
created when the task began it's execution to allow for cleanup on a given task timing out from themain
thread context.If the functionality within this PR is merged the above outline can be turned into an issue and mentioned here for tracking.