Skip to content

Commit 96daf7a

Browse files
Crash in callback group pointer vector iterator
1. Mutually exclusive callback group hangs The root cause for this issue is due to a combination between the multithreaded executor and the mutually exclusive callback group used for all the ROS topics. When the executor collects all the references to the subscriptions, timers and services, it skips the mutually exclusive callback_groups which are currently locked (ie: being processed by another thread). This cause the resulting waitset to only contain the guard pointers. If there is no activity on those guards, the thread will wait for work forever in the get_next_executable and block all other threads. The resolution is to simply add a timeout for the multithreaded get_next_executable call ensuring the calling thread will eventually return. 2. Memory leak in callback group weak reference vectors There is leak in the callback group weak reference vectors that occurs when a weak reference becomes invalid (subscription is dropped, service deleted, etc). The now obsolete weak pointer reference is never deleted from the callback group pointer vector causing the leak. The resolution of this problem is implemented by scanning and deleting expired weak pointer at the time of insertion of a new weak pointer into the callback group vectors. This approach is the lowest computational cost to purging obsolete weak pointers. 3. Crash in iterator for callback group pointer vectors This problem exists because a reference to the callback group pointer vector is provided as a return value to facilitate loop iterator. This is a significant crash root cause with a multithreaded executor where a thread is able to add new subscription to the callback group. The crash is caused by a concurrent modification of the weak pointer vector while another thread is iterating over that same vector resulting in a crash. Testing: These changes where implemented and tested using a test application which creates / publish / deletes thousands of topics (up to 100,000) from a separate standalone thread while the ROS2 layer is receiving data on the topics deleted. The muiltithreaded was setup to contain 10 separate executor threads on a single mutually exclusive callback group containing thousands of topics. issue: #813 Signed-off-by: Guillaume Autran <[email protected]>
1 parent 9be3e08 commit 96daf7a

File tree

6 files changed

+141
-123
lines changed

6 files changed

+141
-123
lines changed

rclcpp/include/rclcpp/callback_group.hpp

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,31 +56,58 @@ class CallbackGroup
5656
friend class rclcpp::node_interfaces::NodeTopics;
5757
friend class rclcpp::node_interfaces::NodeWaitables;
5858

59+
template <typename TypeT, typename Function>
60+
typename TypeT::SharedPtr _find_ptrs_if_impl(
61+
Function func, const std::vector<typename TypeT::WeakPtr>& vect_ptrs) const {
62+
std::lock_guard<std::mutex> lock(mutex_);
63+
for (auto & weak_ptr : vect_ptrs) {
64+
auto ref_ptr = weak_ptr.lock();
65+
if (ref_ptr && func(ref_ptr))
66+
return ref_ptr;
67+
}
68+
return typename TypeT::SharedPtr();
69+
}
70+
5971
public:
6072
RCLCPP_SMART_PTR_DEFINITIONS(CallbackGroup)
6173

6274
RCLCPP_PUBLIC
6375
explicit CallbackGroup(CallbackGroupType group_type);
6476

77+
template <typename Function>
6578
RCLCPP_PUBLIC
66-
const std::vector<rclcpp::SubscriptionBase::WeakPtr> &
67-
get_subscription_ptrs() const;
79+
rclcpp::SubscriptionBase::SharedPtr
80+
find_subscription_ptrs_if(Function func) const {
81+
return _find_ptrs_if_impl<rclcpp::SubscriptionBase, Function>(func, subscription_ptrs_);
82+
}
6883

84+
template <typename Function>
6985
RCLCPP_PUBLIC
70-
const std::vector<rclcpp::TimerBase::WeakPtr> &
71-
get_timer_ptrs() const;
86+
rclcpp::TimerBase::SharedPtr
87+
find_timer_ptrs_if(Function func) const {
88+
return _find_ptrs_if_impl<rclcpp::TimerBase, Function>(func, timer_ptrs_);
89+
}
7290

91+
template <typename Function>
7392
RCLCPP_PUBLIC
74-
const std::vector<rclcpp::ServiceBase::WeakPtr> &
75-
get_service_ptrs() const;
93+
rclcpp::ServiceBase::SharedPtr
94+
find_service_ptrs_if(Function func) const {
95+
return _find_ptrs_if_impl<rclcpp::ServiceBase, Function>(func, service_ptrs_);
96+
}
7697

98+
template <typename Function>
7799
RCLCPP_PUBLIC
78-
const std::vector<rclcpp::ClientBase::WeakPtr> &
79-
get_client_ptrs() const;
100+
rclcpp::ClientBase::SharedPtr
101+
find_client_ptrs_if(Function func) const {
102+
return _find_ptrs_if_impl<rclcpp::ClientBase, Function>(func, client_ptrs_);
103+
}
80104

105+
template <typename Function>
81106
RCLCPP_PUBLIC
82-
const std::vector<rclcpp::Waitable::WeakPtr> &
83-
get_waitable_ptrs() const;
107+
rclcpp::Waitable::SharedPtr
108+
find_waitable_ptrs_if(Function func) const {
109+
return _find_ptrs_if_impl<rclcpp::Waitable, Function>(func, waitable_ptrs_);
110+
}
84111

85112
RCLCPP_PUBLIC
86113
std::atomic_bool &

rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -164,40 +164,30 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
164164
if (!group || !group->can_be_taken_from().load()) {
165165
continue;
166166
}
167-
for (auto & weak_subscription : group->get_subscription_ptrs()) {
168-
auto subscription = weak_subscription.lock();
169-
if (subscription) {
167+
group->find_subscription_ptrs_if([this](const auto& subscription) {
170168
subscription_handles_.push_back(subscription->get_subscription_handle());
171169
if (subscription->get_intra_process_subscription_handle()) {
172170
subscription_handles_.push_back(
173171
subscription->get_intra_process_subscription_handle());
174172
}
175-
}
176-
}
177-
for (auto & weak_service : group->get_service_ptrs()) {
178-
auto service = weak_service.lock();
179-
if (service) {
173+
return false;
174+
});
175+
group->find_service_ptrs_if([this](const rclcpp::ServiceBase::SharedPtr& service) {
180176
service_handles_.push_back(service->get_service_handle());
181-
}
182-
}
183-
for (auto & weak_client : group->get_client_ptrs()) {
184-
auto client = weak_client.lock();
185-
if (client) {
177+
return false;
178+
});
179+
group->find_client_ptrs_if([this](const rclcpp::ClientBase::SharedPtr& client) {
186180
client_handles_.push_back(client->get_client_handle());
187-
}
188-
}
189-
for (auto & weak_timer : group->get_timer_ptrs()) {
190-
auto timer = weak_timer.lock();
191-
if (timer) {
181+
return false;
182+
});
183+
group->find_timer_ptrs_if([this](const rclcpp::TimerBase::SharedPtr& timer) {
192184
timer_handles_.push_back(timer->get_timer_handle());
193-
}
194-
}
195-
for (auto & weak_waitable : group->get_waitable_ptrs()) {
196-
auto waitable = weak_waitable.lock();
197-
if (waitable) {
185+
return false;
186+
});
187+
group->find_waitable_ptrs_if([this](const rclcpp::Waitable::SharedPtr& waitable) {
198188
waitable_handles_.push_back(waitable);
199-
}
200-
}
189+
return false;
190+
});
201191
}
202192
}
203193
return has_invalid_weak_nodes;

rclcpp/src/rclcpp/callback_group.cpp

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,40 +23,6 @@ CallbackGroup::CallbackGroup(CallbackGroupType group_type)
2323
: type_(group_type), can_be_taken_from_(true)
2424
{}
2525

26-
const std::vector<rclcpp::SubscriptionBase::WeakPtr> &
27-
CallbackGroup::get_subscription_ptrs() const
28-
{
29-
std::lock_guard<std::mutex> lock(mutex_);
30-
return subscription_ptrs_;
31-
}
32-
33-
const std::vector<rclcpp::TimerBase::WeakPtr> &
34-
CallbackGroup::get_timer_ptrs() const
35-
{
36-
std::lock_guard<std::mutex> lock(mutex_);
37-
return timer_ptrs_;
38-
}
39-
40-
const std::vector<rclcpp::ServiceBase::WeakPtr> &
41-
CallbackGroup::get_service_ptrs() const
42-
{
43-
std::lock_guard<std::mutex> lock(mutex_);
44-
return service_ptrs_;
45-
}
46-
47-
const std::vector<rclcpp::ClientBase::WeakPtr> &
48-
CallbackGroup::get_client_ptrs() const
49-
{
50-
std::lock_guard<std::mutex> lock(mutex_);
51-
return client_ptrs_;
52-
}
53-
54-
const std::vector<rclcpp::Waitable::WeakPtr> &
55-
CallbackGroup::get_waitable_ptrs() const
56-
{
57-
std::lock_guard<std::mutex> lock(mutex_);
58-
return waitable_ptrs_;
59-
}
6026

6127
std::atomic_bool &
6228
CallbackGroup::can_be_taken_from()
@@ -76,34 +42,64 @@ CallbackGroup::add_subscription(
7642
{
7743
std::lock_guard<std::mutex> lock(mutex_);
7844
subscription_ptrs_.push_back(subscription_ptr);
45+
subscription_ptrs_.erase(
46+
std::remove_if(
47+
subscription_ptrs_.begin(),
48+
subscription_ptrs_.end(),
49+
[](rclcpp::SubscriptionBase::WeakPtr x){return x.expired();}),
50+
subscription_ptrs_.end());
7951
}
8052

8153
void
8254
CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr)
8355
{
8456
std::lock_guard<std::mutex> lock(mutex_);
8557
timer_ptrs_.push_back(timer_ptr);
58+
timer_ptrs_.erase(
59+
std::remove_if(
60+
timer_ptrs_.begin(),
61+
timer_ptrs_.end(),
62+
[](rclcpp::TimerBase::WeakPtr x){return x.expired();}),
63+
timer_ptrs_.end());
8664
}
8765

8866
void
8967
CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr)
9068
{
9169
std::lock_guard<std::mutex> lock(mutex_);
9270
service_ptrs_.push_back(service_ptr);
71+
service_ptrs_.erase(
72+
std::remove_if(
73+
service_ptrs_.begin(),
74+
service_ptrs_.end(),
75+
[](rclcpp::ServiceBase::WeakPtr x){return x.expired();}),
76+
service_ptrs_.end());
9377
}
9478

9579
void
9680
CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr)
9781
{
9882
std::lock_guard<std::mutex> lock(mutex_);
9983
client_ptrs_.push_back(client_ptr);
84+
client_ptrs_.erase(
85+
std::remove_if(
86+
client_ptrs_.begin(),
87+
client_ptrs_.end(),
88+
[](rclcpp::ClientBase::WeakPtr x){return x.expired();}),
89+
client_ptrs_.end());
10090
}
10191

10292
void
10393
CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr)
10494
{
10595
std::lock_guard<std::mutex> lock(mutex_);
10696
waitable_ptrs_.push_back(waitable_ptr);
97+
waitable_ptrs_.erase(
98+
std::remove_if(
99+
waitable_ptrs_.begin(),
100+
waitable_ptrs_.end(),
101+
[](rclcpp::Waitable::WeakPtr x){return x.expired();}),
102+
waitable_ptrs_.end());
107103
}
108104

109105
void

rclcpp/src/rclcpp/executor.cpp

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -511,11 +511,12 @@ Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
511511
if (!group) {
512512
continue;
513513
}
514-
for (auto & weak_timer : group->get_timer_ptrs()) {
515-
auto t = weak_timer.lock();
516-
if (t == timer) {
517-
return group;
518-
}
514+
auto timer_ref = group->find_timer_ptrs_if(
515+
[timer](const rclcpp::TimerBase::SharedPtr& timer_ptr)->bool {
516+
return (timer_ptr == timer);
517+
});
518+
if (timer_ref) {
519+
return group;
519520
}
520521
}
521522
}
@@ -535,14 +536,15 @@ Executor::get_next_timer(AnyExecutable & any_exec)
535536
if (!group || !group->can_be_taken_from().load()) {
536537
continue;
537538
}
538-
for (auto & timer_ref : group->get_timer_ptrs()) {
539-
auto timer = timer_ref.lock();
540-
if (timer && timer->is_ready()) {
541-
any_exec.timer = timer;
542-
any_exec.callback_group = group;
543-
node = get_node_by_group(group);
544-
return;
545-
}
539+
auto timer_ref = group->find_timer_ptrs_if(
540+
[](const rclcpp::TimerBase::SharedPtr& timer)->bool {
541+
return timer->is_ready();
542+
});
543+
if (timer_ref) {
544+
any_exec.timer = timer_ref;
545+
any_exec.callback_group = group;
546+
node = get_node_by_group(group);
547+
return;
546548
}
547549
}
548550
}

rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ MultiThreadedExecutor::run(size_t)
7777
if (!rclcpp::ok(this->context_) || !spinning.load()) {
7878
return;
7979
}
80-
if (!get_next_executable(any_exec)) {
80+
if (!get_next_executable(any_exec, std::chrono::milliseconds(500))) {
8181
continue;
8282
}
8383
if (any_exec.timer) {

0 commit comments

Comments
 (0)