-
Notifications
You must be signed in to change notification settings - Fork 30.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
worker: add connect and setConnectionsListener #53488
Conversation
CC: @dygabo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this all looks fine.
- Do we want to give this an experimental status? Maybe active development?
- Should the docs mention that this is blocking?
- Does this have any effect on current worker threads performance?
Good point. I think I should.
Yes, I forgot about it.
I don't think so. The communication to accept new connections uses the same port which is already in place for communication with the parent thread. I just added a new message. |
This comment was marked as outdated.
This comment was marked as outdated.
@GeoffreyBooth Modifications done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This drives workers further away from web workers and I'm not sure what it solves since:
- The workers can't know the worker they're connecting to already set up its
setConnectionsListener
- If they do - they inherently have the knowledge of whomever created the workers (e.g. isMainThread) in that case they can already make a MessagePort and postMessage it to both workers
I won't block but I'm not a fan of this API (because it seems inherently race conditiony)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I see the direction and with the requirements of having an arbitrary thread start the unique HooksThread
I don't see an alternative to this or something similar. Personally I still believe that restricting the usage of the customization hooks (like by having the main thread be the responsible thread for the configuration of the hooks for the whole process) would be a way that would allow easier, more predictable code.
Apart from that, I found just a few small things in first pass. I would check the code in more detail in the next days and give more feedback after a few tries with #53200 using this. I'm not sure it could cover all races like the one I try to talk about: #53200 (comment)
@benjamingr why? |
bd24516
to
91d4d65
Compare
I can see it, but I think it's necessary to allow better intra-process communication.
They can, indirectly. We might need extended documentation for this, but the general idea is that workers sync up (and share their
Not really. It's really hard to properly propagate ports, especially if you think about several level of workers. With this architecture instead a worker on level 9 can advertise its
See my next comment. |
After @benjamingr and @dygabo commented I double checked and there was a possible race condition when a worker was blocked during a The |
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree with @benjamingr -- it's not ideal to introduce further differences to other thread implementations.
Nowadays if you, for instance, have a 10-level workers hierarchy and you want the level 2 to speak to a worker on level 9 you have to forward a port down 7 levels somehow.
Is this a common use case? I am genuinely curious because it reminds me of anti-patterns in traditional OOP design.
Thanks, the main race condition I noticed was addressed. Now I'm trying to come up with the use case. I need to know the threadId of the worker I'm connecting to - how do I get it? Can't whomever gave it to me give me a created MessagePort instead? Also:
This sounds incorrect? It should wait for a connection listener to exist so there isn't a race between the worker setting up a connection listener and the other thread calling As for the other issue (adding more APIs to worker). I would strongly prefer it if these were static methods (that would ideally also work or can be made to work with web workers). That would address our "standards like" thing (Worker) not deviating from the spec further while it still being possible to add the API. i.e. |
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments and I think the API should be amended, I also didn't thoroughly review for race conditions.
I mostly agree with James that we should return synchronously and update the message port rather than return a promise if possible (rather than have 2 APIs).
That said, I understand the use case, the code seems reasonable (I haven't audited it beyond reading it, building it locally and playing a bit with it)- and this is a new experimental feature. There is plenty of time to discuss/amend it while it's being experimented with before being stable.
6cf8158
to
728f5e7
Compare
This comment was marked as outdated.
This comment was marked as outdated.
The
notable-change
Please suggest a text for the release notes if you'd like to include a more detailed summary, then proceed to update the PR description with the text or a link to the notable change suggested text comment. Otherwise, the commit will be placed in the Other Notable Changes section. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t feel qualified to review the worker code, but the docs (with a few spelling nits) and intent of this look good to me.
728f5e7
to
728a16d
Compare
const { port1, port2 } = new MessageChannel(); | ||
|
||
// Make sure the event loop does not exist before this is completed | ||
port1.ref(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ref()
and unref()
a few lines below are on a sync call path. Is it really needed?
And if it is, why is it not needed across the async await promise
below?
|
||
// Remove reference to the port used for inter-thread communication | ||
{ | ||
Mutex::ScopedLock message_ports_lock(messagePortsMutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider to move this scope before the Mutex::ScopedLock lock(mutex_);
to avoid unneeded nesting of locks.
// Remove reference to the port used for inter-thread communication | ||
{ | ||
Mutex::ScopedLock message_ports_lock(messagePortsMutex); | ||
message_ports.erase(thread_id_.id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case there are other cases to exit a worker (e.g. unhandled exception, kill from parent,...) such an erase needs to be also added in that places.
FWIW I'm not sure if these new APIs will allow to solve the problem of #53200. |
After a careful analysis, I came to a solution which is way simpler, it doesn't mess with C++ and it allows the same communication. See #53682. |
This PR adds two new API to
worker_threads
that allow for cross-thread communication viaMessagePort
s.A thread can invoke
worker.connect
to start a new connection to another thread. This method is blocking. Upon success, the return value is aMessagePort
which can be used to exchange messages.The core idea is that a thread willing to accept connections from other thread uses
worker.setConnectionsListener
to install a callback that it is invoked with a thread id, a port and (optional) data each time another thread attempts a connection.The listener can return
true
to accept the connection. Any other return value will result in the connection being refused.By default, if a thread has no listener associated, the connection will be refused.
Notable Change Text
A new set of experimental APIs has been added to
worker_threads
:connect
andsetConnectionsListener
. These APIs aim to simplify 1-1 inter-thread communication in Node.jsEvery thread (including the main one) can start a connection to any another thread (including the main one) using the
connect
API and providing the targetthreadId
.If the connection is successful, the call will return a
MessagePort
that can be used for the communication.A thread can opt-in to receive incoming connections by calling the
setConnectionsListener
API. The listener will be invoked for each connection attempt and must returntrue
to accept the connection. A thread without a connection listener will refused any connection by default.