-
Notifications
You must be signed in to change notification settings - Fork 417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redesigning the MultiThreadedExecutor #1276
Closed
Closed
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
9f10f30
Add document
audrow 8d787b4
Fix indents
audrow 63017ee
Reword first sentences
audrow e46398f
Make wording clearer
audrow e17e1f7
Remove spaces at ends of lines
audrow 21b7628
Add clarification that the second method doesn't fix timers
audrow 7315b38
Add an example for returning a lambda function
audrow f280c2c
Add an example for the preparation idea
audrow File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
====================================== | ||
:code:`MultiThreadedExecutor` Redesign | ||
====================================== | ||
|
||
:code:`MultiThreadedExecutor` may attempt to execute one executable object multiple times. | ||
This occurs because an executable object can be identified as ready in multiple threads before it has been executed, and each of these threads will then try to execute it. | ||
To fix this, a general solution should do the following: | ||
|
||
* Work for all the types of executable objects: timers, subscriptions, services, clients, and waitables | ||
* Avoid spurious wake ups (which can be costly) | ||
|
||
Summary of discussion so far | ||
---------------------------- | ||
|
||
Broadly speaking, two approaches were discussed: | ||
|
||
1. **Handle no data in the executor**: If more than one thread is trying to execute the same entity and that entity is consumed after it is executed once, check for the executable before trying to execute it: | ||
|
||
.. code-block:: c++ | ||
|
||
void execute() { | ||
if(buffer->has_data()) { | ||
auto data = buffer->consume_data(); | ||
run_any_callback(data); | ||
} else { | ||
// do nothing | ||
} | ||
} | ||
|
||
:Advantages: | ||
* simplicity | ||
* extends interface in a way that generalizes well for all other types of executable objects (i.e., timers) | ||
:Disadvantages: | ||
* spurious wake up calls | ||
* needs to be documented well so every class that executes :code:`AnyExecutable` objects is thread-safe and doesn't hide a race condition | ||
|
||
|
||
2. **Take the data**: This approach takes the data when collecting the ready entities, and that process is done in a mutually exclusive fashion. Thus, this approach may avoid spurious wake ups. | ||
|
||
.. code-block:: c++ | ||
|
||
// called first | ||
void take_data(std::shared_ptr<void> & data) { | ||
data = buffer->consume_data(); | ||
} | ||
|
||
// called second | ||
void execute(std::shared_ptr<void> & data) { | ||
run_any_callback(data_); | ||
} | ||
|
||
:Advantages: | ||
* fixes spurious wake ups for events where :code:`take_data` has a meaning (i.e., not timers) | ||
:Disadvantages: | ||
* more complex | ||
* extends the interface in a way that doesn't generalize well for all other types of executable objects (i.e., timers) | ||
* type erasure of data | ||
|
||
.. note:: | ||
|
||
To avoid erasing the type of the data, :code:`take_data` could be replaced with a :code:`get_handle` method that would return a lambda function. | ||
|
||
.. code-block:: c++ | ||
|
||
// called first | ||
std::function<void()> get_handle() { | ||
return [data=buffer->consume_data()]() { | ||
run_any_callback(data); | ||
}; | ||
} | ||
|
||
// called second | ||
void execute(std::function<void()> handle) { | ||
handle(); | ||
} | ||
|
||
Ideas | ||
----------------------- | ||
|
||
* Add a preparation method to get the event after :code:`take_data`: | ||
|
||
.. code-block:: c++ | ||
|
||
std::shared_ptr<void> & data_; | ||
|
||
// called first | ||
void take_data() { | ||
data_ = buffer->consume_data(); | ||
} | ||
|
||
// called second | ||
std::shared_ptr<void> get_prepared() { | ||
return data_; | ||
} | ||
|
||
// called third, with the data returned in the previous step | ||
void execute(std::shared_ptr<void> & data) { | ||
run_any_callback(data); | ||
} | ||
|
||
|
||
* Store conditions in the waitset (like DDS) | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that having the concept of conditions (like DDS) is nice, but I'm not sure what this item means exactly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We spoke about it in our meeting. I'm not sure that I understand it well enough to explain.
It was something like, we could do what DDS does, which is use conditions that can be peaked at or read. Once a condition is read, it is removed from the queue, so they don't come up again. I believe this would avoid spurious wakeups. @wjwwood, is this correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently in a ROS 2 wait set you can add different kind of entities (subscription, service, ...).
In
DDS
, you can only add conditions to a WaitSet.Conditions API is quite simple, you can only check if them are ready or dispatch an attached callback.
There's no concept of "reading" or "peaking" a condition AFAIU.
I think that having something similar to DDS conditions will simplify our custom handling of different cases in wait set and executor code a lot (and the abstractions looks nice too 😃 ).