diff --git a/rclcpp/include/rclcpp/wait_result.hpp b/rclcpp/include/rclcpp/wait_result.hpp index 3384a7846a..a62f7cdcaa 100644 --- a/rclcpp/include/rclcpp/wait_result.hpp +++ b/rclcpp/include/rclcpp/wait_result.hpp @@ -176,7 +176,9 @@ class WaitResult final for (; ii < wait_set.size_of_timers(); ++ii) { if (rcl_wait_set.timers[ii] != nullptr) { ret = wait_set.timers(ii); - break; + if (ret) { + break; + } } } } @@ -217,7 +219,9 @@ class WaitResult final if (rcl_wait_set.subscriptions[ii] != nullptr) { ret = wait_set.subscriptions(ii); rcl_wait_set.subscriptions[ii] = nullptr; - break; + if (ret) { + break; + } } } } @@ -237,7 +241,9 @@ class WaitResult final if (rcl_wait_set.services[ii] != nullptr) { ret = wait_set.services(ii); rcl_wait_set.services[ii] = nullptr; - break; + if (ret) { + break; + } } } } @@ -257,7 +263,9 @@ class WaitResult final if (rcl_wait_set.clients[ii] != nullptr) { ret = wait_set.clients(ii); rcl_wait_set.clients[ii] = nullptr; - break; + if (ret) { + break; + } } } } diff --git a/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp b/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp index 3b823144ec..c3e47d3a1d 100644 --- a/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp +++ b/rclcpp/include/rclcpp/wait_set_policies/dynamic_storage.hpp @@ -460,59 +460,60 @@ class DynamicStorage : public rclcpp::wait_set_policies::detail::StoragePolicyCo size_t size_of_subscriptions() const { - return shared_subscriptions_.size(); + return subscriptions_.size(); } size_t size_of_timers() const { - return shared_timers_.size(); + return timers_.size(); } size_t size_of_clients() const { - return shared_clients_.size(); + return clients_.size(); } size_t size_of_services() const { - return shared_services_.size(); + return services_.size(); } size_t size_of_waitables() const { - return shared_waitables_.size(); + return waitables_.size(); } std::shared_ptr subscriptions(size_t ii) const { - return shared_subscriptions_[ii].subscription; + return subscriptions_[ii].lock(); } std::shared_ptr timers(size_t ii) const { - return shared_timers_[ii]; + return timers_[ii].lock(); } std::shared_ptr clients(size_t ii) const { - return shared_clients_[ii]; + return clients_[ii].lock(); } std::shared_ptr services(size_t ii) const { - return shared_services_[ii]; + return services_[ii].lock(); } std::shared_ptr waitables(size_t ii) const { - return shared_waitables_[ii].waitable; + return waitables_[ii].lock(); } +private: size_t ownership_reference_counter_ = 0; SequenceOfWeakSubscriptions subscriptions_; diff --git a/rclcpp/include/rclcpp/wait_set_template.hpp b/rclcpp/include/rclcpp/wait_set_template.hpp index ce69da6bf2..01412de4cf 100644 --- a/rclcpp/include/rclcpp/wait_set_template.hpp +++ b/rclcpp/include/rclcpp/wait_set_template.hpp @@ -734,8 +734,6 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli wait_result_dirty_ = false; // this method comes from the SynchronizationPolicy this->sync_wait_result_acquire(); - // this method comes from the StoragePolicy - this->storage_acquire_ownerships(); } /// Called by the WaitResult's destructor to release resources. @@ -752,8 +750,6 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli } wait_result_holding_ = false; wait_result_dirty_ = false; - // this method comes from the StoragePolicy - this->storage_release_ownerships(); // this method comes from the SynchronizationPolicy this->sync_wait_result_release(); } diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 198c784d7c..2a61ed6e5f 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -1031,3 +1031,66 @@ TYPED_TEST(TestExecutors, dropSomeNodesWithSubscription) ASSERT_TRUE(sub1_works); } + +TYPED_TEST(TestExecutors, dropSubscriptionDuringCallback) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + + + auto node = std::make_shared("test_node"); + + bool sub1_works = false; + bool sub2_works = false; + + auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, true); + rclcpp::SubscriptionOptions sub_ops; + sub_ops.callback_group = cbg; + + rclcpp::SubscriptionBase::SharedPtr sub1; + rclcpp::SubscriptionBase::SharedPtr sub2; + + // Note, the executor uses an unordered map internally, to order + // the entities added to the rcl waitset therefore the order of the subscriptions + // is kind of undefined. Therefore each sub deletes the other one. + sub1 = node->create_subscription("/test_drop", 10, + [&sub1_works, &sub2](const test_msgs::msg::Empty &) { + sub1_works = true; + // delete the other subscriber + sub2.reset(); + }, sub_ops); + sub2 = node->create_subscription("/test_drop", 10, + [&sub2_works, &sub1](const test_msgs::msg::Empty &) { + sub2_works = true; + // delete the other subscriber + sub1.reset(); + }, sub_ops); + + auto pub = node->create_publisher("/test_drop", 10); + + // wait for both subs to be connected + auto max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(1500); + while ((sub1->get_publisher_count() == 0) || (sub2->get_publisher_count() == 0)) { + const auto cur_time = std::chrono::steady_clock::now(); + ASSERT_LT(cur_time, max_end_time); + } + + executor.add_node(node); + + // publish some messages, until one subscriber fired. As both subscribers are + // connected to the same topic, they should fire in the same wait. + max_end_time = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); + while (!sub1_works && !sub2_works) { + pub->publish(test_msgs::msg::Empty()); + + // let the executor pick up the node and the timers + executor.spin_all(std::chrono::milliseconds(10)); + + const auto cur_time = std::chrono::steady_clock::now(); + ASSERT_LT(cur_time, max_end_time); + } + + // only one subscriber must have worked, as the other + // one was deleted during the callback + ASSERT_TRUE(!sub1_works || !sub2_works); +}