Skip to content

Commit 9c493c4

Browse files
jmachowinskiJanosch Machowinski
andauthored
Executor strong reference fix (#2745)
#2678 reported that the executor was holding strong references to entities during execution. This commit adds a regression test for this. * fix(Executor): Don't hold strong references to entities during spin This fixes a bug, were dropping an entity during a callback would not prevent further callbacks. Note, there might still be a race in the case of the mutithreaded executor. Signed-off-by: Janosch Machowinski <[email protected]> Co-authored-by: Janosch Machowinski <[email protected]>
1 parent 34cc960 commit 9c493c4

File tree

4 files changed

+86
-18
lines changed

4 files changed

+86
-18
lines changed

rclcpp/include/rclcpp/wait_result.hpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,9 @@ class WaitResult final
176176
for (; ii < wait_set.size_of_timers(); ++ii) {
177177
if (rcl_wait_set.timers[ii] != nullptr) {
178178
ret = wait_set.timers(ii);
179-
break;
179+
if (ret) {
180+
break;
181+
}
180182
}
181183
}
182184
}
@@ -217,7 +219,9 @@ class WaitResult final
217219
if (rcl_wait_set.subscriptions[ii] != nullptr) {
218220
ret = wait_set.subscriptions(ii);
219221
rcl_wait_set.subscriptions[ii] = nullptr;
220-
break;
222+
if (ret) {
223+
break;
224+
}
221225
}
222226
}
223227
}
@@ -237,7 +241,9 @@ class WaitResult final
237241
if (rcl_wait_set.services[ii] != nullptr) {
238242
ret = wait_set.services(ii);
239243
rcl_wait_set.services[ii] = nullptr;
240-
break;
244+
if (ret) {
245+
break;
246+
}
241247
}
242248
}
243249
}
@@ -257,7 +263,9 @@ class WaitResult final
257263
if (rcl_wait_set.clients[ii] != nullptr) {
258264
ret = wait_set.clients(ii);
259265
rcl_wait_set.clients[ii] = nullptr;
260-
break;
266+
if (ret) {
267+
break;
268+
}
261269
}
262270
}
263271
}

rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -460,59 +460,60 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo
460460

461461
size_t size_of_subscriptions() const
462462
{
463-
return shared_subscriptions_.size();
463+
return subscriptions_.size();
464464
}
465465

466466
size_t size_of_timers() const
467467
{
468-
return shared_timers_.size();
468+
return timers_.size();
469469
}
470470

471471
size_t size_of_clients() const
472472
{
473-
return shared_clients_.size();
473+
return clients_.size();
474474
}
475475

476476
size_t size_of_services() const
477477
{
478-
return shared_services_.size();
478+
return services_.size();
479479
}
480480

481481
size_t size_of_waitables() const
482482
{
483-
return shared_waitables_.size();
483+
return waitables_.size();
484484
}
485485

486486
std::shared_ptr<rclcpp::SubscriptionBase>
487487
subscriptions(size_t ii) const
488488
{
489-
return shared_subscriptions_[ii].subscription;
489+
return subscriptions_[ii].lock();
490490
}
491491

492492
std::shared_ptr<rclcpp::TimerBase>
493493
timers(size_t ii) const
494494
{
495-
return shared_timers_[ii];
495+
return timers_[ii].lock();
496496
}
497497

498498
std::shared_ptr<rclcpp::ClientBase>
499499
clients(size_t ii) const
500500
{
501-
return shared_clients_[ii];
501+
return clients_[ii].lock();
502502
}
503503

504504
std::shared_ptr<rclcpp::ServiceBase>
505505
services(size_t ii) const
506506
{
507-
return shared_services_[ii];
507+
return services_[ii].lock();
508508
}
509509

510510
std::shared_ptr<rclcpp::Waitable>
511511
waitables(size_t ii) const
512512
{
513-
return shared_waitables_[ii].waitable;
513+
return waitables_[ii].lock();
514514
}
515515

516+
private:
516517
size_t ownership_reference_counter_ = 0;
517518

518519
SequenceOfWeakSubscriptions subscriptions_;

rclcpp/include/rclcpp/wait_set_template.hpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -734,8 +734,6 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli
734734
wait_result_dirty_ = false;
735735
// this method comes from the SynchronizationPolicy
736736
this->sync_wait_result_acquire();
737-
// this method comes from the StoragePolicy
738-
this->storage_acquire_ownerships();
739737
}
740738

741739
/// Called by the WaitResult's destructor to release resources.
@@ -752,8 +750,6 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli
752750
}
753751
wait_result_holding_ = false;
754752
wait_result_dirty_ = false;
755-
// this method comes from the StoragePolicy
756-
this->storage_release_ownerships();
757753
// this method comes from the SynchronizationPolicy
758754
this->sync_wait_result_release();
759755
}

rclcpp/test/rclcpp/executors/test_executors.cpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,3 +1099,66 @@ TYPED_TEST(TestExecutors, dropSomeNodesWithSubscription)
10991099

11001100
ASSERT_TRUE(sub1_works);
11011101
}
1102+
1103+
TYPED_TEST(TestExecutors, dropSubscriptionDuringCallback)
1104+
{
1105+
using ExecutorType = TypeParam;
1106+
ExecutorType executor;
1107+
1108+
1109+
auto node = std::make_shared<rclcpp::Node>("test_node");
1110+
1111+
bool sub1_works = false;
1112+
bool sub2_works = false;
1113+
1114+
auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, true);
1115+
rclcpp::SubscriptionOptions sub_ops;
1116+
sub_ops.callback_group = cbg;
1117+
1118+
rclcpp::SubscriptionBase::SharedPtr sub1;
1119+
rclcpp::SubscriptionBase::SharedPtr sub2;
1120+
1121+
// Note, the executor uses an unordered map internally, to order
1122+
// the entities added to the rcl waitset therefore the order of the subscriptions
1123+
// is kind of undefined. Therefore each sub deletes the other one.
1124+
sub1 = node->create_subscription<test_msgs::msg::Empty>("/test_drop", 10,
1125+
[&sub1_works, &sub2](const test_msgs::msg::Empty &) {
1126+
sub1_works = true;
1127+
// delete the other subscriber
1128+
sub2.reset();
1129+
}, sub_ops);
1130+
sub2 = node->create_subscription<test_msgs::msg::Empty>("/test_drop", 10,
1131+
[&sub2_works, &sub1](const test_msgs::msg::Empty &) {
1132+
sub2_works = true;
1133+
// delete the other subscriber
1134+
sub1.reset();
1135+
}, sub_ops);
1136+
1137+
auto pub = node->create_publisher<test_msgs::msg::Empty>("/test_drop", 10);
1138+
1139+
// wait for both subs to be connected
1140+
auto max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(1500);
1141+
while ((sub1->get_publisher_count() == 0) || (sub2->get_publisher_count() == 0)) {
1142+
const auto cur_time = std::chrono::steady_clock::now();
1143+
ASSERT_LT(cur_time, max_end_time);
1144+
}
1145+
1146+
executor.add_node(node);
1147+
1148+
// publish some messages, until one subscriber fired. As both subscribers are
1149+
// connected to the same topic, they should fire in the same wait.
1150+
max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
1151+
while (!sub1_works && !sub2_works) {
1152+
pub->publish(test_msgs::msg::Empty());
1153+
1154+
// let the executor pick up the node and the timers
1155+
executor.spin_all(std::chrono::milliseconds(10));
1156+
1157+
const auto cur_time = std::chrono::steady_clock::now();
1158+
ASSERT_LT(cur_time, max_end_time);
1159+
}
1160+
1161+
// only one subscriber must have worked, as the other
1162+
// one was deleted during the callback
1163+
ASSERT_TRUE(!sub1_works || !sub2_works);
1164+
}

0 commit comments

Comments
 (0)