-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatically match QoS settings across the bridge #5
Conversation
Query endpoint info for publishers to get QoS settings before bridging a topic. TODO: only create bridge once a publisher is detected (use GraphListener). TODO: integration test (use launch_testing). Signed-off-by: Jacob Perron <[email protected]>
Poll in a thread until a publisher can be queried for QoS settings. Signed-off-by: Jacob Perron <[email protected]>
Signed-off-by: Jacob Perron <[email protected]>
Signed-off-by: Jacob Perron <[email protected]>
Signed-off-by: Jacob Perron <[email protected]>
Signed-off-by: Jacob Perron <[email protected]>
Signed-off-by: Jacob Perron <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a good start, I've left some comments.
Note to self: review the test cases.
} | ||
}; | ||
auto waiting_thread = std::make_shared<std::thread>(invoke_callback_when_qos_ready); | ||
waiting_threads_.push_back(waiting_thread); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having many waiting threads, you could have one that checks for all the topics.
That scales much better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g.: you can store a std::vector<std::pair<topic, callback>>
and the thread waiting for events can check if there's a public in all topics and call callbacks accordingly (the vector will need mutual exclusion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this and meant to try a refactor to do it. I agree it would scale much better. I'll take a look at doing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, the problem is that the node used for querying topics is specific to the domain ID. In other words, I think we need at least one thread per domain ID. We could try to be a bit more clever and store a std::map<node, std::vector<std::pair<topic, callback>>>
and have exactly one node (domain ID) per thread. The logic's going to get a bit more complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds fine to me, it's a bit tricky but considering that in the common case (bridging from one domain to one domain) you only have one thread it seems to be worth doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL at 4ae1095
} | ||
|
||
// Initialize QoS arbitrarily | ||
QosMatchInfo result_qos(endpoint_info_vec[0].qos_profile()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QosMatchInfo result_qos(endpoint_info_vec[0].qos_profile()); | |
QosMatchInfo result_qos; | |
result_qos.qos.reliability(endpoint_info_vec[0].qos_profile().reliability()); | |
result_qos.qos.durability(endpoint_info_vec[0].qos_profile().durability()); |
I think copying the liveliness and deadline of the read profile isn't a good idea.
Each of the publishers might have a different one, but the good thing is that if your subscription has the default liveliness and default deadline it will match everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, yeah good point. We could also take the largest of all values for deadline and lifespan (note the QoS here is also applied to the publisher). Liveliness is trickier; I'm not sure how to handle the "manual by topic" since we are republishing to the other side. Maybe it's best to always use "automatic" for the bridge publisher. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how to handle the "manual by topic" since we are republishing to the other side
A subscription with "AUTOMATIC" liveliness will match everything, and the publisher should also be automatic if not you will need to have code doing some manual assert_liveliness
calls.
We could also take the largest of all values for deadline and lifespan
Sounds fine to me, that's in general the default but manuall setting the durations to "INFINITY" is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds fine to me, that's in general the default but manuall setting the durations to "INFINITY" is better.
I'd like to try and mimic the QoS as best we can, so I'll avoid setting the durations to infinity by default.
I think we have to make an exception for liveliness, and explicitly use AUTOMATIC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See c62b3a1
- Always use automatic liveliness
- Use max of all deadline
- Use max of all lifespan
Updated the design doc and added tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use max of all deadline
Use max of all lifespan
IMO, those two are better manually chosen, "replicating" them doesn't seem to completely make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They seem to make sense to me. If we have a publisher in domain A with a deadline, then using the same deadline for the bridge publisher on domain B will ensure subscriptions in domain B will experience similar behavior as subscriptions in domain A. Ditto for lifespan. What in particular doesn't make sense about this logic?
I will definitely expose a configuration point for users to override QoS values (e.g. via the YAML config). Expect a follow-up PR after this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They seem to make sense to me. If we have a publisher in domain A with a deadline, then using the same deadline for the bridge publisher on domain B will ensure subscriptions in domain B will experience similar behavior as subscriptions in domain A.
The thing is that the bridge will introduce some delay, so you tipically won't be able to met the same deadline as the original publisher.
If deadlines are important, I would rather think what's a reasonable deadline for the new domain than letting the bridge infer them.
Ditto for lifespan. What in particular doesn't make sense about this logic?
Lifespan is how much the message lives in the queue, and it's important how that's is combined with the history size/kind.
Because the last two cannot be instronspected, I don't think that inferring the lifespan will always lead to good results.
anyways, keeping this logic sounds fine to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is that the bridge will introduce some delay, so you tipically won't be able to met the same deadline as the original publisher.
Since deadline is a duration between messages, any delay introduced by the bridge should (hopefully) be systematic. Even though the total time from the original publisher to the endpoint in the another domain may take longer, the time between consecutive messages republished by the bridged should ideally remain the same as they were before they entered the bridge. E.g. if the original publisher is publishing at 10Hz, I would expect the bridge to also publish at 10Hz into the other domain.
Lifespan is how much the message lives in the queue, and it's important how that's is combined with the history size/kind.
This is a good point. I can add a note about this potential pitfall in the design doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reconsidered and changed the default values to be max integers instead of matching available publishers in #13
Signed-off-by: Jacob Perron <[email protected]>
Signed-off-by: Jacob Perron <[email protected]>
Signed-off-by: Jacob Perron <[email protected]>
Signed-off-by: Jacob Perron <[email protected]>
- Always use automatic liveliness - Use max of deadline policies - Use max of lifespan policies Signed-off-by: Jacob Perron <[email protected]>
Use a condition variable to notify each thread that a new callback has been registered. Signed-off-by: Jacob Perron <[email protected]>
So, the logic introduced in c62b3a1 uncovered an integer overflow bug in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good, only found some minor issues
cv->wait( | ||
lock, | ||
[this, &topic_callback_vec] | ||
{return topic_callback_vec.size() > 0u || this->shutting_down_.load();}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't combine condition variables and atomics, this code is equivalent to:
while (!(topic_callback_vec.size() > 0u || this->shutting_down_.load())) {
// boolean flipped here and notification sent here, deadlock
cv->wait(lock);
}
this article explains the problem well.
tl;dr: replace the atomic with a normal bool
and lock the waiting_map_mutex_
before flipping it (no need to hold the mutex while calling notify_all()
though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually because of the logic used here, it seems that topic_callback_vec.size() > 0u
is always true.
Maybe just deleting the condition variable code (?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think without the condition variable, the thread essentially runs a busy-loop when topic_callback_vec.size() == 0u
. There is no work to do, but it will continue looping and waiting for graph events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching the deadlock bug! If I remove the atomic from the predicate, I get other shutdown issues; Looking into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think without the condition variable, the thread essentially runs a busy-loop when topic_callback_vec.size() == 0u. There is no work to do, but it will continue looping and waiting for graph events.
True, I didn't check that correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching the deadlock bug! If I remove the atomic from the predicate, I get other shutdown issues; Looking into it.
Just to be more clear, my recommendation was to replace this line
shutting_down_.store(true); |
with
{
std::lock_guard<std::mutex> lock(waiting_map_mutex_);
shutting_down_ = true;
}
and shutting_down_
can be made a bool
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now using a regular bool for shutting_down_
and fixed some cleanup logic in the event of a SIGINT: eac644d
std::shared_ptr<rclcpp::Node>, | ||
std::pair<std::shared_ptr<std::thread>, std::shared_ptr<std::condition_variable>>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind, but it doesn't seem that you really need to wrap the std::thread
and the std::condition_variable
in a std::shared_ptr
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::thread
and std::condition_variable
are not copyable, so I think this is the only way I can store a list of them. Correct me if I'm wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::thread and std::condition_variable are not copyable, so I think this is the only way I can store a list of them. Correct me if I'm wrong.
you have to use std::unordered_map::emplace()
to avoid that issue, e.g.:
waiting_threads_.emplace(
std::piecewise_construct,
std::forward_as_tuple(node),
std::forward_as_tuple(
std::piecewise_construct, std::forward_as_tuple(invoke_callback_when_qos_ready), std::forward_as_tuple()));
so beautiful 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though it's perhaps slightly less performant, using shared_ptr seems easier to understand 😅
If we get a SIGINT while querying a topic's QoS, then exit cleanly. Signed-off-by: Jacob Perron <[email protected]>
} catch (const rclcpp::exceptions::RCLError & ex) { | ||
// If the context was shutdown, then exit cleanly | ||
// This can happen if we get a SIGINT | ||
const auto context = node->get_node_options().context(); | ||
if (!context->is_valid()) { | ||
return; | ||
} | ||
throw ex; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
argh... this doesn't look pretty nice
doesn't matter here, but it would be nice if we could fix this in rclcpp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I know.. I think the only other way around it would be to disable the default signint handler and implement our own, but that wouldn't look pretty either.
Signed-off-by: Jacob Perron <[email protected]>
I've updated the design doc to reflect the implementation.
Specifically, this change introduces a
WaitForQosHandler
class used for deferring topic bridge creation. It creates a thread for each topic bridge to wait for at least one publisher to be available. Once a publisher is available it signals to the domain bridge via a callback, and a topic bridge is created.In the special case of more than one publisher with different QoS settings, I've adopted a similar approach as rosbag2 for selecting a QoS that is compatible with most of the available publishers. Note, we could factor out this logic following ros2/rosbag2#601 and/or ros2/rmw#304.