diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 5e1157f204..3edebaecbd 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -56,8 +56,44 @@ class CallbackGroup public: RCLCPP_SMART_PTR_DEFINITIONS(CallbackGroup) + /// Constructor for CallbackGroup. + /** + * Callback Groups have a type, either 'Mutually Exclusive' or 'Reentrant' + * and when creating one the type must be specified. + * + * Callbacks in Reentrant Callback Groups must be able to: + * - run at the same time as themselves (reentrant) + * - run at the same time as other callbacks in their group + * - run at the same time as other callbacks in other groups + * + * Callbacks in Mutually Exclusive Callback Groups: + * - will not be run multiple times simultaneously (non-reentrant) + * - will not be run at the same time as other callbacks in their group + * - but must run at the same time as callbacks in other groups + * + * Additiionally, callback groups have a property which determines whether or + * not they are added to an executor with their associated node automatically. + * When creating a callback group the automatically_add_to_executor_with_node + * argument determines this behavior, and if true it will cause the newly + * created callback group to be added to an executor with the node when the + * Executor::add_node method is used. + * If false, this callback group will not be added automatically and would + * have to be added to an executor manually using the + * Executor::add_callback_group method. + * + * Whether the node was added to the executor before creating the callback + * group, or after, is irrelevant; the callback group will be automatically + * added to the executor in either case. + * + * \param[in] group_type They type of the callback group. + * \param[in] automatically_add_to_executor_with_node a boolean which + * determines whether a callback group is automatically added to an executor + * with the node with which it is associated. + */ RCLCPP_PUBLIC - explicit CallbackGroup(CallbackGroupType group_type); + explicit CallbackGroup( + CallbackGroupType group_type, + bool automatically_add_to_executor_with_node = true); template rclcpp::SubscriptionBase::SharedPtr @@ -102,6 +138,31 @@ class CallbackGroup const CallbackGroupType & type() const; + /// Return a reference to the 'associated with executor' atomic boolean. + /** + * When a callback group is added to an executor this boolean is checked + * to ensure it has not already been added to another executor. + * If it has not been, then this boolean is set to true to indicate it is + * now associated with an executor. + * + * When the callback group is removed from the executor, this atomic boolean + * is set back to false. + * + * \return the 'associated with executor' atomic boolean + */ + RCLCPP_PUBLIC + std::atomic_bool & + get_associated_with_executor_atomic(); + + /// Return true if this callback group should be automatically added to an executor by the node. + /** + * \return boolean true if this callback group should be automatically added + * to an executor when the associated node is added, otherwise false. + */ + RCLCPP_PUBLIC + bool + automatically_add_to_executor_with_node() const; + protected: RCLCPP_DISABLE_COPY(CallbackGroup) @@ -136,12 +197,14 @@ class CallbackGroup CallbackGroupType type_; // Mutex to protect the subsequent vectors of pointers. mutable std::mutex mutex_; + std::atomic_bool associated_with_executor_; std::vector subscription_ptrs_; std::vector timer_ptrs_; std::vector service_ptrs_; std::vector client_ptrs_; std::vector waitable_ptrs_; std::atomic_bool can_be_taken_from_; + const bool automatically_add_to_executor_with_node_; private: template diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index a4c9f0b943..4af227192b 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,10 @@ namespace rclcpp { +typedef std::map> WeakCallbackGroupsToNodesMap; + // Forward declaration is used in convenience method signature. class Node; @@ -76,13 +81,118 @@ class Executor virtual void spin() = 0; + /// Add a callback group to an executor. + /** + * An executor can have zero or more callback groups which provide work during `spin` functions. + * When an executor attempts to add a callback group, the executor checks to see if it is already + * associated with another executor, and if it has been, then an exception is thrown. + * Otherwise, the callback group is added to the executor. + * + * Adding a callback group with this method does not associate its node with this executor + * in any way + * + * \param[in] group_ptr a shared ptr that points to a callback group + * \param[in] node_ptr a shared pointer that points to a node base interface + * \param[in] notify True to trigger the interrupt guard condition during this function. If + * the executor is blocked at the rmw layer while waiting for work and it is notified that a new + * callback group was added, it will wake up. + * \throw std::runtime_error if the callback group is associated to an executor + */ + RCLCPP_PUBLIC + virtual void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true); + + /// Get callback groups that belong to executor. + /** + * This function returns a vector of weak pointers that point to callback groups that were + * associated with the executor. + * The callback groups associated with this executor may have been added with + * `add_callback_group`, or added when a node was added to the executor with `add_node`, or + * automatically added when it created by a node already associated with this executor and the + * automatically_add_to_executor_with_node parameter was true. + * + * \return a vector of weak pointers that point to callback groups that are associated with + * the executor + */ + RCLCPP_PUBLIC + virtual std::vector + get_all_callback_groups(); + + /// Get callback groups that belong to executor. + /** + * This function returns a vector of weak pointers that point to callback groups that were + * associated with the executor. + * The callback groups associated with this executor have been added with + * `add_callback_group`. + * + * \return a vector of weak pointers that point to callback groups that are associated with + * the executor + */ + RCLCPP_PUBLIC + virtual std::vector + get_manually_added_callback_groups(); + + /// Get callback groups that belong to executor. + /** + * This function returns a vector of weak pointers that point to callback groups that were + * added from a node that is associated with the executor. + * The callback groups are added when a node is added to the executor with `add_node`, or + * automatically if they are created in the future by that node and have the + * automatically_add_to_executor_with_node argument set to true. + * + * \return a vector of weak pointers that point to callback groups from a node associated with + * the executor + */ + RCLCPP_PUBLIC + virtual std::vector + get_automatically_added_callback_groups_from_nodes(); + + /// Remove a callback group from the executor. + /** + * The callback group is removed from and disassociated with the executor. + * If the callback group removed was the last callback group from the node + * that is associated with the executor, the interrupt guard condition + * is triggered and node's guard condition is removed from the executor. + * + * This function only removes a callback group that was manually added with + * rclcpp::Executor::add_callback_group. + * To remove callback groups that were added from a node using + * rclcpp::Executor::add_node, use rclcpp::Executor::remove_node instead. + * + * \param[in] group_ptr Shared pointer to the callback group to be added. + * \param[in] notify True to trigger the interrupt guard condition during this function. If + * the executor is blocked at the rmw layer while waiting for work and it is notified that a + * callback group was removed, it will wake up. + * \throw std::runtime_error if node is deleted before callback group + * \throw std::runtime_error if the callback group is not associated with the executor + */ + RCLCPP_PUBLIC + virtual void + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + bool notify = true); + /// Add a node to the executor. /** - * An executor can have zero or more nodes which provide work during `spin` functions. + * Nodes have associated callback groups, and this method adds any of those callback groups + * to this executor which have their automatically_add_to_executor_with_node parameter true. + * The node is also associated with the executor so that future callback groups which are + * created on the node with the automatically_add_to_executor_with_node parameter set to true + * are also automatically associated with this executor. + * + * Callback groups with the automatically_add_to_executor_with_node parameter set to false must + * be manually added to an executor using the rclcpp::Executor::add_callback_group method. + * + * If a node is already associated with an executor, this method throws an exception. + * * \param[in] node_ptr Shared pointer to the node to be added. * \param[in] notify True to trigger the interrupt guard condition during this function. If * the executor is blocked at the rmw layer while waiting for work and it is notified that a new * node was added, it will wake up. + * \throw std::runtime_error if a node is already associated to an executor */ RCLCPP_PUBLIC virtual void @@ -98,10 +208,19 @@ class Executor /// Remove a node from the executor. /** + * Any callback groups automatically added when this node was added with + * rclcpp::Executor::add_node are automatically removed, and the node is no longer associated + * with this executor. + * + * This also means that future callback groups created by the given node are no longer + * automatically added to this executor. + * * \param[in] node_ptr Shared pointer to the node to remove. * \param[in] notify True to trigger the interrupt guard condition and wake up the executor. * This is useful if the last node was removed from the executor while the executor was blocked * waiting for work in another thread, because otherwise the executor would never be notified. + * \throw std::runtime_error if the node is not associated with an executor. + * \throw std::runtime_error if the node is not associated with this executor. */ RCLCPP_PUBLIC virtual void @@ -327,22 +446,79 @@ class Executor RCLCPP_PUBLIC rclcpp::node_interfaces::NodeBaseInterface::SharedPtr - get_node_by_group(rclcpp::CallbackGroup::SharedPtr group); + get_node_by_group( + WeakCallbackGroupsToNodesMap weak_groups_to_nodes, + rclcpp::CallbackGroup::SharedPtr group); + + /// Return true if the node has been added to this executor. + /** + * \param[in] node_ptr a shared pointer that points to a node base interface + * \return true if the node is associated with the executor, otherwise false + */ + RCLCPP_PUBLIC + bool + has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const; RCLCPP_PUBLIC rclcpp::CallbackGroup::SharedPtr get_group_by_timer(rclcpp::TimerBase::SharedPtr timer); + /// Add a callback group to an executor + /** + * \see rclcpp::Executor::add_callback_group + */ + RCLCPP_PUBLIC + virtual void + add_callback_group_to_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, + bool notify = true); + + /// Remove a callback group from the executor. + /** + * \see rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + virtual void + remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, + bool notify = true); + RCLCPP_PUBLIC bool get_next_ready_executable(AnyExecutable & any_executable); + RCLCPP_PUBLIC + bool + get_next_ready_executable_from_map( + AnyExecutable & any_executable, + WeakCallbackGroupsToNodesMap weak_groups_to_nodes); + RCLCPP_PUBLIC bool get_next_executable( AnyExecutable & any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + /// Add all callback groups that can be automatically added from associated nodes. + /** + * The executor, before collecting entities, verifies if any callback group from + * nodes associated with the executor, which is not already associated to an executor, + * can be automatically added to this executor. + * This takes care of any callback group that has been added to a node but not explicitly added + * to the executor. + * It is important to note that in order for the callback groups to be automatically added to an + * executor through this function, the node of the callback groups needs to have been added + * through the `add_node` method. + */ + RCLCPP_PUBLIC + virtual void + add_callback_groups_from_nodes_associated_to_executor(); + /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; @@ -367,8 +543,25 @@ class Executor void spin_once_impl(std::chrono::nanoseconds timeout); + typedef std::map> + WeakNodesToGuardConditionsMap; + + /// maps nodes to guard conditions + WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; + + /// maps callback groups associated to nodes + WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_; + + /// maps callback groups to nodes associated with executor + WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_; + + /// maps all callback groups to nodes + WeakCallbackGroupsToNodesMap weak_groups_to_nodes_; + + /// nodes that are associated with the executor std::list weak_nodes_; - std::list guard_conditions_; }; namespace executor diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp index 0df25fde9d..a2dd148d01 100644 --- a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp @@ -17,7 +17,9 @@ #include #include +#include #include +#include #include "rcl/guard_condition.h" #include "rcl/wait.h" @@ -32,6 +34,9 @@ namespace rclcpp { namespace executors { +typedef std::map> WeakCallbackGroupsToNodesMap; class StaticExecutorEntitiesCollector final : public rclcpp::Waitable, @@ -86,7 +91,7 @@ class StaticExecutorEntitiesCollector final /** * block until the wait set is ready or until the timeout has been exceeded. * \throws std::runtime_error if wait set couldn't be cleared or filled. - * \throws any rcl errors from rcl_wait, \sa rclcpp::exceptions::throw_from_rcl_error() + * \throws any rcl errors from rcl_wait, \see rclcpp::exceptions::throw_from_rcl_error() */ RCLCPP_PUBLIC void @@ -103,17 +108,58 @@ class StaticExecutorEntitiesCollector final size_t get_number_of_ready_guard_conditions() override; + /// Add a callback group to an executor. /** - * \sa rclcpp::Executor::add_node() + * \see rclcpp::Executor::add_callback_group + */ + RCLCPP_PUBLIC + bool + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Add a callback group to an executor. + /** + * \see rclcpp::Executor::add_callback_group + * \return boolean whether the node from the callback group is new + */ + RCLCPP_PUBLIC + bool + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + /// Remove a callback group from the executor. + /** + * \see rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + bool + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr); + + /// Remove a callback group from the executor. + /** + * \see rclcpp::Executor::remove_callback_group_from_map + */ + RCLCPP_PUBLIC + bool + remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); + + /** + * \see rclcpp::Executor::add_node() * \throw std::runtime_error if node was already added */ RCLCPP_PUBLIC - void + bool add_node( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); /** - * \sa rclcpp::Executor::remove_node() + * \see rclcpp::Executor::remove_node() * \throw std::runtime_error if no guard condition is associated with node. */ RCLCPP_PUBLIC @@ -121,6 +167,26 @@ class StaticExecutorEntitiesCollector final remove_node( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + RCLCPP_PUBLIC + std::vector + get_all_callback_groups(); + + /// Get callback groups that belong to executor. + /** + * \see rclcpp::Executor::get_manually_added_callback_groups() + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups(); + + /// Get callback groups that belong to executor. + /** + * \see rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups_from_nodes(); + /// Complete all available queued work without blocking. /** * This function checks if after the guard condition was triggered @@ -216,13 +282,45 @@ class StaticExecutorEntitiesCollector final rclcpp::Waitable::SharedPtr get_waitable(size_t i) {return exec_list_.waitable[i];} + /// Return true if the node belongs to the collector + /** + * \param[in] group_ptr a node base interface shared pointer + * \return boolean whether a node belongs the collector + */ + bool + has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const; + private: - /// Nodes guard conditions which trigger this waitable - std::list guard_conditions_; + /// Add all callback groups that can be automatically added by any executor + /// and is not already associated with an executor from nodes + /// that are associated with executor + /** + * \see rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor() + */ + RCLCPP_PUBLIC + void + add_callback_groups_from_nodes_associated_to_executor(); + + RCLCPP_PUBLIC + void + fill_executable_list_from_map(WeakCallbackGroupsToNodesMap weak_groups_to_nodes); /// Memory strategy: an interface for handling user-defined memory allocation strategies. rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; + // maps callback groups to nodes. + WeakCallbackGroupsToNodesMap weak_groups_associated_with_executor_to_nodes_; + // maps callback groups to nodes. + WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_; + + typedef std::map> + WeakNodesToGuardConditionsMap; + WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_; + /// List of weak nodes registered in the static executor std::list weak_nodes_; diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index c78259ca75..d286ee150b 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -78,15 +78,30 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor void spin() override; + /// Add a callback group to an executor. + /** + * \sa rclcpp::Executor::add_callback_group + */ + RCLCPP_PUBLIC + void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Remove callback group from the executor + /** + * \sa rclcpp::Executor::remove_callback_group + */ + RCLCPP_PUBLIC + void + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + bool notify = true) override; + /// Add a node to the executor. /** - * An executor can have zero or more nodes which provide work during `spin` functions. - * \param[in] node_ptr Shared pointer to the node to be added. - * \param[in] notify True to trigger the interrupt guard condition during this function. If - * the executor is blocked at the rmw layer while waiting for work and it is notified that a new - * node was added, it will wake up. - * \throw std::runtime_error if node was already added or if rcl_trigger_guard_condition - * return an error + * \sa rclcpp::Executor::add_node */ RCLCPP_PUBLIC void @@ -96,8 +111,7 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor /// Convenience function which takes Node and forwards NodeBaseInterface. /** - * \throw std::runtime_error if node was already added or if rcl_trigger_guard_condition - * returns an error + * \sa rclcpp::StaticSingleThreadedExecutor::add_node */ RCLCPP_PUBLIC void @@ -105,11 +119,7 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor /// Remove a node from the executor. /** - * \param[in] node_ptr Shared pointer to the node to remove. - * \param[in] notify True to trigger the interrupt guard condition and wake up the executor. - * This is useful if the last node was removed from the executor while the executor was blocked - * waiting for work in another thread, because otherwise the executor would never be notified. - * \throw std::runtime_error if rcl_trigger_guard_condition returns an error + * \sa rclcpp::Executor::remove_node */ RCLCPP_PUBLIC void @@ -119,12 +129,32 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor /// Convenience function which takes Node and forwards NodeBaseInterface. /** - * \throw std::runtime_error if rcl_trigger_guard_condition returns an error + * \sa rclcpp::Executor::remove_node */ RCLCPP_PUBLIC void remove_node(std::shared_ptr node_ptr, bool notify = true) override; + RCLCPP_PUBLIC + std::vector + get_all_callback_groups() override; + + /// Get callback groups that belong to executor. + /** + * \sa rclcpp::Executor::get_manually_added_callback_groups() + */ + RCLCPP_PUBLIC + std::vector + get_manually_added_callback_groups() override; + + /// Get callback groups that belong to executor. + /** + * \sa rclcpp::Executor::get_automatically_added_callback_groups_from_nodes() + */ + RCLCPP_PUBLIC + std::vector + get_automatically_added_callback_groups_from_nodes() override; + /// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted. /** * \param[in] future The future to wait on. If this function returns SUCCESS, the future can be diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index 5677533a4a..da79643e4e 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -16,6 +16,7 @@ #define RCLCPP__MEMORY_STRATEGY_HPP_ #include +#include #include #include "rcl/allocator.h" @@ -42,11 +43,13 @@ class RCLCPP_PUBLIC MemoryStrategy { public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy) - using WeakNodeList = std::list; + using WeakCallbackGroupsToNodesMap = std::map>; virtual ~MemoryStrategy() = default; - virtual bool collect_entities(const WeakNodeList & weak_nodes) = 0; + virtual bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) = 0; virtual size_t number_of_ready_subscriptions() const = 0; virtual size_t number_of_ready_services() const = 0; @@ -68,27 +71,27 @@ class RCLCPP_PUBLIC MemoryStrategy virtual void get_next_subscription( rclcpp::AnyExecutable & any_exec, - const WeakNodeList & weak_nodes) = 0; + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) = 0; virtual void get_next_service( rclcpp::AnyExecutable & any_exec, - const WeakNodeList & weak_nodes) = 0; + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) = 0; virtual void get_next_client( rclcpp::AnyExecutable & any_exec, - const WeakNodeList & weak_nodes) = 0; + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) = 0; virtual void get_next_timer( rclcpp::AnyExecutable & any_exec, - const WeakNodeList & weak_nodes) = 0; + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) = 0; virtual void get_next_waitable( rclcpp::AnyExecutable & any_exec, - const WeakNodeList & weak_nodes) = 0; + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) = 0; virtual rcl_allocator_t get_allocator() = 0; @@ -96,52 +99,52 @@ class RCLCPP_PUBLIC MemoryStrategy static rclcpp::SubscriptionBase::SharedPtr get_subscription_by_handle( std::shared_ptr subscriber_handle, - const WeakNodeList & weak_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::ServiceBase::SharedPtr get_service_by_handle( std::shared_ptr service_handle, - const WeakNodeList & weak_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::ClientBase::SharedPtr get_client_by_handle( std::shared_ptr client_handle, - const WeakNodeList & weak_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::TimerBase::SharedPtr get_timer_by_handle( std::shared_ptr timer_handle, - const WeakNodeList & weak_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_by_group( rclcpp::CallbackGroup::SharedPtr group, - const WeakNodeList & weak_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::CallbackGroup::SharedPtr get_group_by_subscription( rclcpp::SubscriptionBase::SharedPtr subscription, - const WeakNodeList & weak_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::CallbackGroup::SharedPtr get_group_by_service( rclcpp::ServiceBase::SharedPtr service, - const WeakNodeList & weak_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::CallbackGroup::SharedPtr get_group_by_client( rclcpp::ClientBase::SharedPtr client, - const WeakNodeList & weak_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::CallbackGroup::SharedPtr get_group_by_timer( rclcpp::TimerBase::SharedPtr timer, - const WeakNodeList & weak_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::CallbackGroup::SharedPtr get_group_by_waitable( rclcpp::Waitable::SharedPtr waitable, - const WeakNodeList & weak_nodes); + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); }; } // namespace memory_strategy diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 8e8bfcb591..6bafd80b26 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -141,7 +141,9 @@ class Node : public std::enable_shared_from_this /// Create and return a callback group. RCLCPP_PUBLIC rclcpp::CallbackGroup::SharedPtr - create_callback_group(rclcpp::CallbackGroupType group_type); + create_callback_group( + rclcpp::CallbackGroupType group_type, + bool automatically_add_to_executor_with_node = true); /// Return the list of callback groups in the node. RCLCPP_PUBLIC diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp index 4386ee5e55..5e7f8b7d35 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp @@ -83,7 +83,9 @@ class NodeBase : public NodeBaseInterface RCLCPP_PUBLIC rclcpp::CallbackGroup::SharedPtr - create_callback_group(rclcpp::CallbackGroupType group_type) override; + create_callback_group( + rclcpp::CallbackGroupType group_type, + bool automatically_add_to_executor_with_node = true) override; RCLCPP_PUBLIC rclcpp::CallbackGroup::SharedPtr diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp index bcb469c0bf..840159aaba 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp @@ -106,7 +106,9 @@ class NodeBaseInterface RCLCPP_PUBLIC virtual rclcpp::CallbackGroup::SharedPtr - create_callback_group(rclcpp::CallbackGroupType group_type) = 0; + create_callback_group( + rclcpp::CallbackGroupType group_type, + bool automatically_add_to_executor_with_node = true) = 0; /// Return the default callback group. RCLCPP_PUBLIC diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index 4cad8d548b..76790b1245 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -150,48 +150,47 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy ); } - bool collect_entities(const WeakNodeList & weak_nodes) override + bool collect_entities(const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override { - bool has_invalid_weak_nodes = false; - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { - has_invalid_weak_nodes = true; + bool has_invalid_weak_groups_or_nodes = false; + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (group == nullptr || node == nullptr) { + has_invalid_weak_groups_or_nodes = true; continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group || !group->can_be_taken_from().load()) { - continue; - } - group->find_subscription_ptrs_if( - [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { - subscription_handles_.push_back(subscription->get_subscription_handle()); - return false; - }); - group->find_service_ptrs_if( - [this](const rclcpp::ServiceBase::SharedPtr & service) { - service_handles_.push_back(service->get_service_handle()); - return false; - }); - group->find_client_ptrs_if( - [this](const rclcpp::ClientBase::SharedPtr & client) { - client_handles_.push_back(client->get_client_handle()); - return false; - }); - group->find_timer_ptrs_if( - [this](const rclcpp::TimerBase::SharedPtr & timer) { - timer_handles_.push_back(timer->get_timer_handle()); - return false; - }); - group->find_waitable_ptrs_if( - [this](const rclcpp::Waitable::SharedPtr & waitable) { - waitable_handles_.push_back(waitable); - return false; - }); + if (!group || !group->can_be_taken_from().load()) { + continue; } + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + subscription_handles_.push_back(subscription->get_subscription_handle()); + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + service_handles_.push_back(service->get_service_handle()); + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + client_handles_.push_back(client->get_client_handle()); + return false; + }); + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + timer_handles_.push_back(timer->get_timer_handle()); + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + waitable_handles_.push_back(waitable); + return false; + }); } - return has_invalid_weak_nodes; + + return has_invalid_weak_groups_or_nodes; } void add_waitable_handle(const rclcpp::Waitable::SharedPtr & waitable) override @@ -264,14 +263,14 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy void get_next_subscription( rclcpp::AnyExecutable & any_exec, - const WeakNodeList & weak_nodes) override + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override { auto it = subscription_handles_.begin(); while (it != subscription_handles_.end()) { - auto subscription = get_subscription_by_handle(*it, weak_nodes); + auto subscription = get_subscription_by_handle(*it, weak_groups_to_nodes); if (subscription) { // Find the group for this handle and see if it can be serviced - auto group = get_group_by_subscription(subscription, weak_nodes); + auto group = get_group_by_subscription(subscription, weak_groups_to_nodes); if (!group) { // Group was not found, meaning the subscription is not valid... // Remove it from the ready list and continue looking @@ -287,7 +286,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy // Otherwise it is safe to set and return the any_exec any_exec.subscription = subscription; any_exec.callback_group = group; - any_exec.node_base = get_node_by_group(group, weak_nodes); + any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes); subscription_handles_.erase(it); return; } @@ -299,14 +298,14 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy void get_next_service( rclcpp::AnyExecutable & any_exec, - const WeakNodeList & weak_nodes) override + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override { auto it = service_handles_.begin(); while (it != service_handles_.end()) { - auto service = get_service_by_handle(*it, weak_nodes); + auto service = get_service_by_handle(*it, weak_groups_to_nodes); if (service) { // Find the group for this handle and see if it can be serviced - auto group = get_group_by_service(service, weak_nodes); + auto group = get_group_by_service(service, weak_groups_to_nodes); if (!group) { // Group was not found, meaning the service is not valid... // Remove it from the ready list and continue looking @@ -322,7 +321,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy // Otherwise it is safe to set and return the any_exec any_exec.service = service; any_exec.callback_group = group; - any_exec.node_base = get_node_by_group(group, weak_nodes); + any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes); service_handles_.erase(it); return; } @@ -332,14 +331,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } void - get_next_client(rclcpp::AnyExecutable & any_exec, const WeakNodeList & weak_nodes) override + get_next_client( + rclcpp::AnyExecutable & any_exec, + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override { auto it = client_handles_.begin(); while (it != client_handles_.end()) { - auto client = get_client_by_handle(*it, weak_nodes); + auto client = get_client_by_handle(*it, weak_groups_to_nodes); if (client) { // Find the group for this handle and see if it can be serviced - auto group = get_group_by_client(client, weak_nodes); + auto group = get_group_by_client(client, weak_groups_to_nodes); if (!group) { // Group was not found, meaning the service is not valid... // Remove it from the ready list and continue looking @@ -355,7 +356,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy // Otherwise it is safe to set and return the any_exec any_exec.client = client; any_exec.callback_group = group; - any_exec.node_base = get_node_by_group(group, weak_nodes); + any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes); client_handles_.erase(it); return; } @@ -367,14 +368,14 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy void get_next_timer( rclcpp::AnyExecutable & any_exec, - const WeakNodeList & weak_nodes) override + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override { auto it = timer_handles_.begin(); while (it != timer_handles_.end()) { - auto timer = get_timer_by_handle(*it, weak_nodes); + auto timer = get_timer_by_handle(*it, weak_groups_to_nodes); if (timer) { // Find the group for this handle and see if it can be serviced - auto group = get_group_by_timer(timer, weak_nodes); + auto group = get_group_by_timer(timer, weak_groups_to_nodes); if (!group) { // Group was not found, meaning the timer is not valid... // Remove it from the ready list and continue looking @@ -390,7 +391,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy // Otherwise it is safe to set and return the any_exec any_exec.timer = timer; any_exec.callback_group = group; - any_exec.node_base = get_node_by_group(group, weak_nodes); + any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes); timer_handles_.erase(it); return; } @@ -400,14 +401,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } void - get_next_waitable(rclcpp::AnyExecutable & any_exec, const WeakNodeList & weak_nodes) override + get_next_waitable( + rclcpp::AnyExecutable & any_exec, + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) override { auto it = waitable_handles_.begin(); while (it != waitable_handles_.end()) { auto waitable = *it; if (waitable) { // Find the group for this handle and see if it can be serviced - auto group = get_group_by_waitable(waitable, weak_nodes); + auto group = get_group_by_waitable(waitable, weak_groups_to_nodes); if (!group) { // Group was not found, meaning the waitable is not valid... // Remove it from the ready list and continue looking @@ -423,7 +426,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy // Otherwise it is safe to set and return the any_exec any_exec.waitable = waitable; any_exec.callback_group = group; - any_exec.node_base = get_node_by_group(group, weak_nodes); + any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes); waitable_handles_.erase(it); return; } diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp index db77e46b1e..6b38578ba9 100644 --- a/rclcpp/src/rclcpp/callback_group.cpp +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -19,8 +19,12 @@ using rclcpp::CallbackGroup; using rclcpp::CallbackGroupType; -CallbackGroup::CallbackGroup(CallbackGroupType group_type) -: type_(group_type), can_be_taken_from_(true) +CallbackGroup::CallbackGroup( + CallbackGroupType group_type, + bool automatically_add_to_executor_with_node) +: type_(group_type), associated_with_executor_(false), + can_be_taken_from_(true), + automatically_add_to_executor_with_node_(automatically_add_to_executor_with_node) {} @@ -36,6 +40,18 @@ CallbackGroup::type() const return type_; } +std::atomic_bool & +CallbackGroup::get_associated_with_executor_atomic() +{ + return associated_with_executor_; +} + +bool +CallbackGroup::automatically_add_to_executor_with_node() const +{ + return automatically_add_to_executor_with_node_; +} + void CallbackGroup::add_subscription( const rclcpp::SubscriptionBase::SharedPtr subscription_ptr) diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 5aac2284ed..f380b99b14 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -14,8 +14,11 @@ #include #include +#include #include #include +#include +#include #include "rcl/allocator.h" #include "rcl/error_handling.h" @@ -82,19 +85,33 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) Executor::~Executor() { - // Disassocate all nodes - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (node) { - std::atomic_bool & has_executor = node->get_associated_with_executor_atomic(); + // Disassocate all callback groups. + for (auto & pair : weak_groups_to_nodes_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); has_executor.store(false); } } + // Disassocate all nodes. + std::for_each( + weak_nodes_.begin(), weak_nodes_.end(), [] + (rclcpp::node_interfaces::NodeBaseInterface::WeakPtr weak_node_ptr) { + auto shared_node_ptr = weak_node_ptr.lock(); + if (shared_node_ptr) { + std::atomic_bool & has_executor = shared_node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + } + }); weak_nodes_.clear(); - for (auto & guard_condition : guard_conditions_) { + weak_groups_associated_with_executor_to_nodes_.clear(); + weak_groups_to_nodes_associated_with_executor_.clear(); + weak_groups_to_nodes_.clear(); + for (const auto & pair : weak_nodes_to_guard_conditions_) { + auto & guard_condition = pair.second; memory_strategy_->remove_guard_condition(guard_condition); } - guard_conditions_.clear(); + weak_nodes_to_guard_conditions_.clear(); // Finalize the wait set. if (rcl_wait_set_fini(&wait_set_) != RCL_RET_OK) { @@ -115,6 +132,116 @@ Executor::~Executor() context_->release_interrupt_guard_condition(&wait_set_, std::nothrow); } +std::vector +Executor::get_all_callback_groups() +{ + std::vector groups; + for (const auto & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +Executor::get_manually_added_callback_groups() +{ + std::vector groups; + for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +Executor::get_automatically_added_callback_groups_from_nodes() +{ + std::vector groups; + for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +void +Executor::add_callback_groups_from_nodes_associated_to_executor() +{ + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + auto group_ptrs = node->get_callback_groups(); + std::for_each( + group_ptrs.begin(), group_ptrs.end(), + [this, node](rclcpp::CallbackGroup::WeakPtr group_ptr) + { + auto shared_group_ptr = group_ptr.lock(); + if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() && + !shared_group_ptr->get_associated_with_executor_atomic().load()) + { + this->add_callback_group_to_map( + shared_group_ptr, + node, + weak_groups_to_nodes_associated_with_executor_, + true); + } + }); + } + } +} + +void +Executor::add_callback_group_to_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, + bool notify) +{ + // If the callback_group already has an executor + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Callback group has already been added to an executor."); + } + bool is_new_node = !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) && + !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_); + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + auto insert_info = + weak_groups_to_nodes.insert(std::make_pair(weak_group_ptr, node_ptr)); + bool was_inserted = insert_info.second; + if (!was_inserted) { + throw std::runtime_error("Callback group was already added to executor."); + } + // Also add to the map that contains all callback groups + weak_groups_to_nodes_.insert(std::make_pair(weak_group_ptr, node_ptr)); + if (is_new_node) { + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr); + weak_nodes_to_guard_conditions_[node_weak_ptr] = node_ptr->get_notify_guard_condition(); + if (notify) { + // Interrupt waiting to handle new node + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string().str); + } + } + // Add the node's notify condition to the guard condition handles + std::unique_lock lock(memory_strategy_mutex_); + memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition()); + } +} + +void +Executor::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify) +{ + this->add_callback_group_to_map( + group_ptr, + node_ptr, + weak_groups_associated_with_executor_to_nodes_, + notify); +} + void Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { @@ -123,25 +250,67 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt if (has_executor.exchange(true)) { throw std::runtime_error("Node has already been added to an executor."); } - // Check to ensure node not already added - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (node == node_ptr) { - // TODO(jacquelinekay): Use a different error here? - throw std::runtime_error("Cannot add node to executor, node already added."); + for (auto & weak_group : node_ptr->get_callback_groups()) { + auto group_ptr = weak_group.lock(); + if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + this->add_callback_group_to_map( + group_ptr, + node_ptr, + weak_groups_to_nodes_associated_with_executor_, + notify); } } weak_nodes_.push_back(node_ptr); - guard_conditions_.push_back(node_ptr->get_notify_guard_condition()); - if (notify) { - // Interrupt waiting to handle new node - if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { - throw std::runtime_error(rcl_get_error_string().str); +} + +void +Executor::remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes, + bool notify) +{ + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr; + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + auto iter = weak_groups_to_nodes.find(weak_group_ptr); + if (iter != weak_groups_to_nodes.end()) { + node_ptr = iter->second.lock(); + if (node_ptr == nullptr) { + throw std::runtime_error("Node must not be deleted before its callback group(s)."); + } + weak_groups_to_nodes.erase(iter); + weak_groups_to_nodes_.erase(group_ptr); + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + } else { + throw std::runtime_error("Callback group needs to be associated with executor."); + } + // If the node was matched and removed, interrupt waiting. + if (!has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_) && + !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_)) + { + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr); + weak_nodes_to_guard_conditions_.erase(node_weak_ptr); + if (notify) { + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string().str); + } } + std::unique_lock lock(memory_strategy_mutex_); + memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition()); } - // Add the node's notify condition to the guard condition handles - std::unique_lock lock(memory_strategy_mutex_); - memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition()); +} + +void +Executor::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + bool notify) +{ + this->remove_callback_group_from_map( + group_ptr, + weak_groups_associated_with_executor_to_nodes_, + notify); } void @@ -153,34 +322,47 @@ Executor::add_node(std::shared_ptr node_ptr, bool notify) void Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { - bool node_removed = false; - { - auto node_it = weak_nodes_.begin(); - auto gc_it = guard_conditions_.begin(); - while (node_it != weak_nodes_.end()) { - bool matched = (node_it->lock() == node_ptr); - if (matched) { - node_it = weak_nodes_.erase(node_it); - gc_it = guard_conditions_.erase(gc_it); - node_removed = true; - } else { - ++node_it; - ++gc_it; - } + if (!node_ptr->get_associated_with_executor_atomic().load()) { + throw std::runtime_error("Node needs to be associated with an executor."); + } + + bool found_node = false; + auto node_it = weak_nodes_.begin(); + while (node_it != weak_nodes_.end()) { + bool matched = (node_it->lock() == node_ptr); + if (matched) { + found_node = true; + node_it = weak_nodes_.erase(node_it); + } else { + ++node_it; } } + if (!found_node) { + throw std::runtime_error("Node needs to be associated with this executor."); + } + std::vector found_group_ptrs; + std::for_each( + weak_groups_to_nodes_associated_with_executor_.begin(), + weak_groups_to_nodes_associated_with_executor_.end(), + [&found_group_ptrs, node_ptr](std::pair key_value_pair) { + auto weak_node_ptr = key_value_pair.second; + auto shared_node_ptr = weak_node_ptr.lock(); + auto group_ptr = key_value_pair.first.lock(); + if (shared_node_ptr == node_ptr) { + found_group_ptrs.push_back(group_ptr); + } + }); + std::for_each( + found_group_ptrs.begin(), found_group_ptrs.end(), [this, notify] + (rclcpp::CallbackGroup::SharedPtr group_ptr) { + remove_callback_group_from_map( + group_ptr, + weak_groups_to_nodes_associated_with_executor_, + notify); + }); std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); has_executor.store(false); - if (notify) { - // If the node was matched and removed, interrupt waiting - if (node_removed) { - if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { - throw std::runtime_error(rcl_get_error_string().str); - } - } - } - std::unique_lock lock(memory_strategy_mutex_); - memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition()); } void @@ -473,25 +655,46 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) { std::unique_lock lock(memory_strategy_mutex_); + // Check weak_nodes_ to find any callback group that is not owned + // by an executor and add it to the list of callbackgroups for + // collect entities. Also exchange to false so it is not + // allowed to add to another executor + add_callback_groups_from_nodes_associated_to_executor(); + // Collect the subscriptions and timers to be waited on memory_strategy_->clear_handles(); - bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); - - // Clean up any invalid nodes, if they were detected - if (has_invalid_weak_nodes) { - auto node_it = weak_nodes_.begin(); - auto gc_it = guard_conditions_.begin(); - while (node_it != weak_nodes_.end()) { - if (node_it->expired()) { - node_it = weak_nodes_.erase(node_it); - memory_strategy_->remove_guard_condition(*gc_it); - gc_it = guard_conditions_.erase(gc_it); - } else { - ++node_it; - ++gc_it; + bool has_invalid_weak_groups_or_nodes = + memory_strategy_->collect_entities(weak_groups_to_nodes_); + + if (has_invalid_weak_groups_or_nodes) { + std::vector invalid_group_ptrs; + for (auto pair : weak_groups_to_nodes_) { + auto weak_group_ptr = pair.first; + auto weak_node_ptr = pair.second; + if (weak_group_ptr.expired() || weak_node_ptr.expired()) { + invalid_group_ptrs.push_back(weak_group_ptr); + auto node_guard_pair = weak_nodes_to_guard_conditions_.find(weak_node_ptr); + weak_nodes_to_guard_conditions_.erase(weak_node_ptr); + memory_strategy_->remove_guard_condition(node_guard_pair->second); } } + std::for_each( + invalid_group_ptrs.begin(), invalid_group_ptrs.end(), + [this](rclcpp::CallbackGroup::WeakPtr group_ptr) { + if (weak_groups_to_nodes_associated_with_executor_.find(group_ptr) != + weak_groups_to_nodes_associated_with_executor_.end()) + { + weak_groups_to_nodes_associated_with_executor_.erase(group_ptr); + } + if (weak_groups_associated_with_executor_to_nodes_.find(group_ptr) != + weak_groups_associated_with_executor_to_nodes_.end()) + { + weak_groups_associated_with_executor_to_nodes_.erase(group_ptr); + } + weak_groups_to_nodes_.erase(group_ptr); + }); } + // clear wait set if (rcl_wait_set_clear(&wait_set_) != RCL_RET_OK) { throw std::runtime_error("Couldn't clear wait set"); @@ -512,6 +715,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) throw std::runtime_error("Couldn't fill wait set"); } } + rcl_ret_t status = rcl_wait(&wait_set_, std::chrono::duration_cast(timeout).count()); if (status == RCL_RET_WAIT_SET_EMPTY) { @@ -529,22 +733,18 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) } rclcpp::node_interfaces::NodeBaseInterface::SharedPtr -Executor::get_node_by_group(rclcpp::CallbackGroup::SharedPtr group) +Executor::get_node_by_group( + WeakCallbackGroupsToNodesMap weak_groups_to_nodes, + rclcpp::CallbackGroup::SharedPtr group) { if (!group) { return nullptr; } - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto callback_group = weak_group.lock(); - if (callback_group == group) { - return node; - } - } + rclcpp::CallbackGroup::WeakPtr weak_group_ptr(group); + const auto finder = weak_groups_to_nodes.find(weak_group_ptr); + if (finder != weak_groups_to_nodes.end()) { + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr = finder->second.lock(); + return node_ptr; } return nullptr; } @@ -552,80 +752,103 @@ Executor::get_node_by_group(rclcpp::CallbackGroup::SharedPtr group) rclcpp::CallbackGroup::SharedPtr Executor::get_group_by_timer(rclcpp::TimerBase::SharedPtr timer) { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { + for (const auto & pair : weak_groups_associated_with_executor_to_nodes_) { + auto group = pair.first.lock(); + if (!group) { continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - auto timer_ref = group->find_timer_ptrs_if( - [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { - return timer_ptr == timer; - }); - if (timer_ref) { - return group; - } + auto timer_ref = group->find_timer_ptrs_if( + [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { + return timer_ptr == timer; + }); + if (timer_ref) { + return group; } } - return rclcpp::CallbackGroup::SharedPtr(); + + for (const auto & pair : weak_groups_to_nodes_associated_with_executor_) { + auto group = pair.first.lock(); + if (!group) { + continue; + } + auto timer_ref = group->find_timer_ptrs_if( + [timer](const rclcpp::TimerBase::SharedPtr & timer_ptr) -> bool { + return timer_ptr == timer; + }); + if (timer_ref) { + return group; + } + } + return nullptr; } bool Executor::get_next_ready_executable(AnyExecutable & any_executable) +{ + bool success = get_next_ready_executable_from_map(any_executable, weak_groups_to_nodes_); + return success; +} + +bool +Executor::get_next_ready_executable_from_map( + AnyExecutable & any_executable, + WeakCallbackGroupsToNodesMap weak_groups_to_nodes) { bool success = false; // Check the timers to see if there are any that are ready - memory_strategy_->get_next_timer(any_executable, weak_nodes_); + memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes); if (any_executable.timer) { success = true; } if (!success) { // Check the subscriptions to see if there are any that are ready - memory_strategy_->get_next_subscription(any_executable, weak_nodes_); + memory_strategy_->get_next_subscription(any_executable, weak_groups_to_nodes); if (any_executable.subscription) { success = true; } } if (!success) { // Check the services to see if there are any that are ready - memory_strategy_->get_next_service(any_executable, weak_nodes_); + memory_strategy_->get_next_service(any_executable, weak_groups_to_nodes); if (any_executable.service) { success = true; } } if (!success) { // Check the clients to see if there are any that are ready - memory_strategy_->get_next_client(any_executable, weak_nodes_); + memory_strategy_->get_next_client(any_executable, weak_groups_to_nodes); if (any_executable.client) { success = true; } } if (!success) { // Check the waitables to see if there are any that are ready - memory_strategy_->get_next_waitable(any_executable, weak_nodes_); + memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes); if (any_executable.waitable) { success = true; } } - // At this point any_exec should be valid with either a valid subscription + // At this point any_executable should be valid with either a valid subscription // or a valid timer, or it should be a null shared_ptr + if (success) { + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = any_executable.callback_group; + auto iter = weak_groups_to_nodes.find(weak_group_ptr); + if (iter == weak_groups_to_nodes.end()) { + success = false; + } + } + if (success) { // If it is valid, check to see if the group is mutually exclusive or - // not, then mark it accordingly - using callback_group::CallbackGroupType; - if ( - any_executable.callback_group && - any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) + // not, then mark it accordingly ..Check if the callback_group belongs to this executor + if (any_executable.callback_group && any_executable.callback_group->type() == \ + CallbackGroupType::MutuallyExclusive) { // It should not have been taken otherwise assert(any_executable.callback_group->can_be_taken_from().load()); // Set to false to indicate something is being run from this group - // This is reset to true either when the any_exec is executed or when the - // any_exec is destructued + // This is reset to true either when the any_executable is executed or when the + // any_executable is destructued any_executable.callback_group->can_be_taken_from().store(false); } } @@ -652,3 +875,18 @@ Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanos } return success; } + +// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry. +bool +Executor::has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const +{ + return std::find_if( + weak_groups_to_nodes.begin(), + weak_groups_to_nodes.end(), + [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool { + auto other_ptr = other.second.lock(); + return other_ptr == node_ptr; + }) != weak_groups_to_nodes.end(); +} diff --git a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp index 5461e665ab..7e2b505509 100644 --- a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp @@ -18,6 +18,8 @@ #include #include #include +#include +#include #include "rclcpp/memory_strategy.hpp" #include "rclcpp/executors/static_single_threaded_executor.hpp" @@ -26,6 +28,21 @@ using rclcpp::executors::StaticExecutorEntitiesCollector; StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector() { + // Disassocate all callback groups and thus nodes. + for (auto & pair : weak_groups_associated_with_executor_to_nodes_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); + has_executor.store(false); + } + } + for (auto & pair : weak_groups_to_nodes_associated_with_executor_) { + auto group = pair.first.lock(); + if (group) { + std::atomic_bool & has_executor = group->get_associated_with_executor_atomic(); + has_executor.store(false); + } + } // Disassociate all nodes for (auto & weak_node : weak_nodes_) { auto node = weak_node.lock(); @@ -34,9 +51,11 @@ StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector() has_executor.store(false); } } + weak_groups_associated_with_executor_to_nodes_.clear(); + weak_groups_to_nodes_associated_with_executor_.clear(); exec_list_.clear(); weak_nodes_.clear(); - guard_conditions_.clear(); + weak_nodes_to_guard_conditions_.clear(); } void @@ -56,8 +75,7 @@ StaticExecutorEntitiesCollector::init( memory_strategy_ = memory_strategy; // Add executor's guard condition - guard_conditions_.push_back(executor_guard_condition); - + memory_strategy_->add_guard_condition(executor_guard_condition); // Get memory strategy and executable list. Prepare wait_set_ execute(); } @@ -77,18 +95,41 @@ void StaticExecutorEntitiesCollector::fill_memory_strategy() { memory_strategy_->clear_handles(); - bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); - + bool has_invalid_weak_groups_or_nodes = + memory_strategy_->collect_entities(weak_groups_to_nodes_associated_with_executor_); // Clean up any invalid nodes, if they were detected - if (has_invalid_weak_nodes) { - auto node_it = weak_nodes_.begin(); - while (node_it != weak_nodes_.end()) { - if (node_it->expired()) { - node_it = weak_nodes_.erase(node_it); - } else { - ++node_it; + if (has_invalid_weak_groups_or_nodes) { + std::vector invalid_group_ptrs; + for (auto & pair : weak_groups_to_nodes_associated_with_executor_) { + auto & weak_group_ptr = pair.first; + auto & weak_node_ptr = pair.second; + if (weak_group_ptr.expired() || weak_node_ptr.expired()) { + invalid_group_ptrs.push_back(weak_group_ptr); } } + std::for_each( + invalid_group_ptrs.begin(), invalid_group_ptrs.end(), + [this](rclcpp::CallbackGroup::WeakPtr group_ptr) { + weak_groups_to_nodes_associated_with_executor_.erase(group_ptr); + }); + } + has_invalid_weak_groups_or_nodes = + memory_strategy_->collect_entities(weak_groups_associated_with_executor_to_nodes_); + // Clean up any invalid nodes, if they were detected + if (has_invalid_weak_groups_or_nodes) { + std::vector invalid_group_ptrs; + for (auto & pair : weak_groups_associated_with_executor_to_nodes_) { + auto & weak_group_ptr = pair.first; + auto & weak_node_ptr = pair.second; + if (weak_group_ptr.expired() || weak_node_ptr.expired()) { + invalid_group_ptrs.push_back(weak_group_ptr); + } + } + std::for_each( + invalid_group_ptrs.begin(), invalid_group_ptrs.end(), + [this](rclcpp::CallbackGroup::WeakPtr group_ptr) { + weak_groups_associated_with_executor_to_nodes_.erase(group_ptr); + }); } // Add the static executor waitable to the memory strategy @@ -99,59 +140,58 @@ void StaticExecutorEntitiesCollector::fill_executable_list() { exec_list_.clear(); - - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { + add_callback_groups_from_nodes_associated_to_executor(); + fill_executable_list_from_map(weak_groups_associated_with_executor_to_nodes_); + fill_executable_list_from_map(weak_groups_to_nodes_associated_with_executor_); + // Add the executor's waitable to the executable list + exec_list_.add_waitable(shared_from_this()); +} +void +StaticExecutorEntitiesCollector::fill_executable_list_from_map( + WeakCallbackGroupsToNodesMap weak_groups_to_nodes) +{ + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (!node || !group || !group->can_be_taken_from().load()) { continue; } - // Check in all the callback groups - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group || !group->can_be_taken_from().load()) { - continue; - } - - group->find_timer_ptrs_if( - [this](const rclcpp::TimerBase::SharedPtr & timer) { - if (timer) { - exec_list_.add_timer(timer); - } - return false; - }); - group->find_subscription_ptrs_if( - [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { - if (subscription) { - exec_list_.add_subscription(subscription); - } - return false; - }); - group->find_service_ptrs_if( - [this](const rclcpp::ServiceBase::SharedPtr & service) { - if (service) { - exec_list_.add_service(service); - } - return false; - }); - group->find_client_ptrs_if( - [this](const rclcpp::ClientBase::SharedPtr & client) { - if (client) { - exec_list_.add_client(client); - } - return false; - }); - group->find_waitable_ptrs_if( - [this](const rclcpp::Waitable::SharedPtr & waitable) { - if (waitable) { - exec_list_.add_waitable(waitable); - } - return false; - }); - } + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + if (timer) { + exec_list_.add_timer(timer); + } + return false; + }); + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if (subscription) { + exec_list_.add_subscription(subscription); + } + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + if (service) { + exec_list_.add_service(service); + } + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + if (client) { + exec_list_.add_client(client); + } + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if (waitable) { + exec_list_.add_waitable(waitable); + } + return false; + }); } - - // Add the executor's waitable to the executable list - exec_list_.add_waitable(shared_from_this()); } void @@ -205,7 +245,8 @@ bool StaticExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set) { // Add waitable guard conditions (one for each registered node) into the wait set. - for (const auto & gc : guard_conditions_) { + for (const auto & pair : weak_nodes_to_guard_conditions_) { + auto & gc = pair.second; rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, gc, NULL); if (ret != RCL_RET_OK) { throw std::runtime_error("Executor waitable: couldn't add guard condition to wait set"); @@ -216,55 +257,152 @@ StaticExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set) size_t StaticExecutorEntitiesCollector::get_number_of_ready_guard_conditions() { - return guard_conditions_.size(); + return weak_nodes_to_guard_conditions_.size(); } -void +bool StaticExecutorEntitiesCollector::add_node( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) { - // Check to ensure node not already added - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (node == node_ptr) { - // TODO(jacquelinekay): Use a different error here? - throw std::runtime_error("Cannot add node to executor, node already added."); + bool is_new_node = false; + // If the node already has an executor + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Node has already been added to an executor."); + } + for (auto & weak_group : node_ptr->get_callback_groups()) { + auto group_ptr = weak_group.lock(); + if (group_ptr != nullptr && !group_ptr->get_associated_with_executor_atomic().load() && + group_ptr->automatically_add_to_executor_with_node()) + { + is_new_node = (add_callback_group( + group_ptr, + node_ptr, + weak_groups_to_nodes_associated_with_executor_) || + is_new_node); } } - weak_nodes_.push_back(node_ptr); - guard_conditions_.push_back(node_ptr->get_notify_guard_condition()); + return is_new_node; +} + +bool +StaticExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + // If the callback_group already has an executor + std::atomic_bool & has_executor = group_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Callback group has already been added to an executor."); + } + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + auto insert_info = weak_groups_to_nodes.insert( + std::make_pair(weak_group_ptr, node_ptr)); + bool was_inserted = insert_info.second; + if (!was_inserted) { + throw std::runtime_error("Callback group was already added to executor."); + } + bool is_new_node = !has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && + !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_); + if (is_new_node) { + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr); + weak_nodes_to_guard_conditions_[node_weak_ptr] = node_ptr->get_notify_guard_condition(); + return true; + } + return false; +} + +bool +StaticExecutorEntitiesCollector::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + return add_callback_group(group_ptr, node_ptr, weak_groups_associated_with_executor_to_nodes_); +} + +bool +StaticExecutorEntitiesCollector::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr) +{ + return this->remove_callback_group_from_map( + group_ptr, + weak_groups_associated_with_executor_to_nodes_); +} + +bool +StaticExecutorEntitiesCollector::remove_callback_group_from_map( + rclcpp::CallbackGroup::SharedPtr group_ptr, + WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) +{ + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr; + rclcpp::CallbackGroup::WeakPtr weak_group_ptr = group_ptr; + auto iter = weak_groups_to_nodes.find(weak_group_ptr); + if (iter != weak_groups_to_nodes.end()) { + node_ptr = iter->second.lock(); + if (node_ptr == nullptr) { + throw std::runtime_error("Node must not be deleted before its callback group(s)."); + } + weak_groups_to_nodes.erase(iter); + } else { + throw std::runtime_error("Callback group needs to be associated with executor."); + } + // If the node was matched and removed, interrupt waiting. + if (!has_node(node_ptr, weak_groups_associated_with_executor_to_nodes_) && + !has_node(node_ptr, weak_groups_to_nodes_associated_with_executor_)) + { + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr); + weak_nodes_to_guard_conditions_.erase(node_weak_ptr); + return true; + } + return false; } bool StaticExecutorEntitiesCollector::remove_node( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) { + if (!node_ptr->get_associated_with_executor_atomic().load()) { + return false; + } + bool node_found = false; auto node_it = weak_nodes_.begin(); - while (node_it != weak_nodes_.end()) { bool matched = (node_it->lock() == node_ptr); if (matched) { - // Find and remove node and its guard condition - auto gc_it = std::find( - guard_conditions_.begin(), - guard_conditions_.end(), - node_ptr->get_notify_guard_condition()); - - if (gc_it != guard_conditions_.end()) { - guard_conditions_.erase(gc_it); - weak_nodes_.erase(node_it); - return true; - } - - throw std::runtime_error("Didn't find guard condition associated with node."); - - } else { - ++node_it; + weak_nodes_.erase(node_it); + node_found = true; + break; } + ++node_it; } - - return false; + if (!node_found) { + return false; + } + std::vector found_group_ptrs; + std::for_each( + weak_groups_to_nodes_associated_with_executor_.begin(), + weak_groups_to_nodes_associated_with_executor_.end(), + [&found_group_ptrs, node_ptr](std::pair key_value_pair) { + auto & weak_node_ptr = key_value_pair.second; + auto shared_node_ptr = weak_node_ptr.lock(); + auto group_ptr = key_value_pair.first.lock(); + if (shared_node_ptr == node_ptr) { + found_group_ptrs.push_back(group_ptr); + } + }); + std::for_each( + found_group_ptrs.begin(), found_group_ptrs.end(), [this] + (rclcpp::CallbackGroup::SharedPtr group_ptr) { + this->remove_callback_group_from_map( + group_ptr, + weak_groups_to_nodes_associated_with_executor_); + }); + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); + return true; } bool @@ -273,13 +411,13 @@ StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set) // Check wait_set guard_conditions for added/removed entities to/from a node for (size_t i = 0; i < p_wait_set->size_of_guard_conditions; ++i) { if (p_wait_set->guard_conditions[i] != NULL) { - // Check if the guard condition triggered belongs to a node - auto it = std::find( - guard_conditions_.begin(), guard_conditions_.end(), - p_wait_set->guard_conditions[i]); - - // If it does, we are ready to re-collect entities - if (it != guard_conditions_.end()) { + auto found_guard_condition = std::find_if( + weak_nodes_to_guard_conditions_.begin(), weak_nodes_to_guard_conditions_.end(), + [&](std::pair pair) -> bool { + return pair.second == p_wait_set->guard_conditions[i]; + }); + if (found_guard_condition != weak_nodes_to_guard_conditions_.end()) { return true; } } @@ -287,3 +425,76 @@ StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set) // None of the guard conditions triggered belong to a registered node return false; } + +// Returns true iff the weak_groups_to_nodes map has node_ptr as the value in any of its entry. +bool +StaticExecutorEntitiesCollector::has_node( + const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + WeakCallbackGroupsToNodesMap weak_groups_to_nodes) const +{ + return std::find_if( + weak_groups_to_nodes.begin(), + weak_groups_to_nodes.end(), + [&](const WeakCallbackGroupsToNodesMap::value_type & other) -> bool { + auto other_ptr = other.second.lock(); + return other_ptr == node_ptr; + }) != weak_groups_to_nodes.end(); +} + +void +StaticExecutorEntitiesCollector::add_callback_groups_from_nodes_associated_to_executor() +{ + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + auto group_ptrs = node->get_callback_groups(); + std::for_each( + group_ptrs.begin(), group_ptrs.end(), + [this, node](rclcpp::CallbackGroup::WeakPtr group_ptr) + { + auto shared_group_ptr = group_ptr.lock(); + if (shared_group_ptr && shared_group_ptr->automatically_add_to_executor_with_node() && + !shared_group_ptr->get_associated_with_executor_atomic().load()) + { + add_callback_group( + shared_group_ptr, + node, + weak_groups_to_nodes_associated_with_executor_); + } + }); + } + } +} + +std::vector +StaticExecutorEntitiesCollector::get_all_callback_groups() +{ + std::vector groups; + for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +StaticExecutorEntitiesCollector::get_manually_added_callback_groups() +{ + std::vector groups; + for (auto const & group_node_ptr : weak_groups_associated_with_executor_to_nodes_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} + +std::vector +StaticExecutorEntitiesCollector::get_automatically_added_callback_groups_from_nodes() +{ + std::vector groups; + for (auto const & group_node_ptr : weak_groups_to_nodes_associated_with_executor_) { + groups.push_back(group_node_ptr.first); + } + return groups; +} diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 728a0902a1..772cd8e260 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -15,6 +15,7 @@ #include "rclcpp/executors/static_single_threaded_executor.hpp" #include +#include #include "rclcpp/scope_exit.hpp" @@ -50,23 +51,31 @@ StaticSingleThreadedExecutor::spin() } void -StaticSingleThreadedExecutor::add_node( - rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +StaticSingleThreadedExecutor::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify) { - // If the node already has an executor - std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); - if (has_executor.exchange(true)) { - throw std::runtime_error("Node has already been added to an executor."); + bool is_new_node = entities_collector_->add_callback_group(group_ptr, node_ptr); + if (is_new_node && notify) { + // Interrupt waiting to handle new node + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string().str); + } } +} - if (notify) { +void +StaticSingleThreadedExecutor::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + bool is_new_node = entities_collector_->add_node(node_ptr); + if (is_new_node && notify) { // Interrupt waiting to handle new node if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { throw std::runtime_error(rcl_get_error_string().str); } } - - entities_collector_->add_node(node_ptr); } void @@ -75,23 +84,51 @@ StaticSingleThreadedExecutor::add_node(std::shared_ptr node_ptr, b this->add_node(node_ptr->get_node_base_interface(), notify); } +void +StaticSingleThreadedExecutor::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify) +{ + bool node_removed = entities_collector_->remove_callback_group(group_ptr); + // If the node was matched and removed, interrupt waiting + if (node_removed && notify) { + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string().str); + } + } +} + void StaticSingleThreadedExecutor::remove_node( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) { bool node_removed = entities_collector_->remove_node(node_ptr); - + if (!node_removed) { + throw std::runtime_error("Node needs to be associated with this executor."); + } + // If the node was matched and removed, interrupt waiting if (notify) { - // If the node was matched and removed, interrupt waiting - if (node_removed) { - if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { - throw std::runtime_error(rcl_get_error_string().str); - } + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string().str); } } +} + +std::vector +StaticSingleThreadedExecutor::get_all_callback_groups() +{ + return entities_collector_->get_all_callback_groups(); +} - std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); - has_executor.store(false); +std::vector +StaticSingleThreadedExecutor::get_manually_added_callback_groups() +{ + return entities_collector_->get_manually_added_callback_groups(); +} + +std::vector +StaticSingleThreadedExecutor::get_automatically_added_callback_groups_from_nodes() +{ + return entities_collector_->get_automatically_added_callback_groups_from_nodes(); } void diff --git a/rclcpp/src/rclcpp/memory_strategy.cpp b/rclcpp/src/rclcpp/memory_strategy.cpp index f3973ca9cc..bd32bdb169 100644 --- a/rclcpp/src/rclcpp/memory_strategy.cpp +++ b/rclcpp/src/rclcpp/memory_strategy.cpp @@ -20,25 +20,19 @@ using rclcpp::memory_strategy::MemoryStrategy; rclcpp::SubscriptionBase::SharedPtr MemoryStrategy::get_subscription_by_handle( std::shared_ptr subscriber_handle, - const WeakNodeList & weak_nodes) + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + if (!group) { continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - auto match_subscription = group->find_subscription_ptrs_if( - [&subscriber_handle](const rclcpp::SubscriptionBase::SharedPtr & subscription) -> bool { - return subscription->get_subscription_handle() == subscriber_handle; - }); - if (match_subscription) { - return match_subscription; - } + auto match_subscription = group->find_subscription_ptrs_if( + [&subscriber_handle](const rclcpp::SubscriptionBase::SharedPtr & subscription) -> bool { + return subscription->get_subscription_handle() == subscriber_handle; + }); + if (match_subscription) { + return match_subscription; } } return nullptr; @@ -47,25 +41,19 @@ MemoryStrategy::get_subscription_by_handle( rclcpp::ServiceBase::SharedPtr MemoryStrategy::get_service_by_handle( std::shared_ptr service_handle, - const WeakNodeList & weak_nodes) + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + if (!group) { continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - auto service_ref = group->find_service_ptrs_if( - [&service_handle](const rclcpp::ServiceBase::SharedPtr & service) -> bool { - return service->get_service_handle() == service_handle; - }); - if (service_ref) { - return service_ref; - } + auto service_ref = group->find_service_ptrs_if( + [&service_handle](const rclcpp::ServiceBase::SharedPtr & service) -> bool { + return service->get_service_handle() == service_handle; + }); + if (service_ref) { + return service_ref; } } return nullptr; @@ -74,25 +62,19 @@ MemoryStrategy::get_service_by_handle( rclcpp::ClientBase::SharedPtr MemoryStrategy::get_client_by_handle( std::shared_ptr client_handle, - const WeakNodeList & weak_nodes) + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + if (!group) { continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - auto client_ref = group->find_client_ptrs_if( - [&client_handle](const rclcpp::ClientBase::SharedPtr & client) -> bool { - return client->get_client_handle() == client_handle; - }); - if (client_ref) { - return client_ref; - } + auto client_ref = group->find_client_ptrs_if( + [&client_handle](const rclcpp::ClientBase::SharedPtr & client) -> bool { + return client->get_client_handle() == client_handle; + }); + if (client_ref) { + return client_ref; } } return nullptr; @@ -101,25 +83,19 @@ MemoryStrategy::get_client_by_handle( rclcpp::TimerBase::SharedPtr MemoryStrategy::get_timer_by_handle( std::shared_ptr timer_handle, - const WeakNodeList & weak_nodes) + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + if (!group) { continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - auto timer_ref = group->find_timer_ptrs_if( - [&timer_handle](const rclcpp::TimerBase::SharedPtr & timer) -> bool { - return timer->get_timer_handle() == timer_handle; - }); - if (timer_ref) { - return timer_ref; - } + auto timer_ref = group->find_timer_ptrs_if( + [&timer_handle](const rclcpp::TimerBase::SharedPtr & timer) -> bool { + return timer->get_timer_handle() == timer_handle; + }); + if (timer_ref) { + return timer_ref; } } return nullptr; @@ -128,22 +104,17 @@ MemoryStrategy::get_timer_by_handle( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr MemoryStrategy::get_node_by_group( rclcpp::CallbackGroup::SharedPtr group, - const WeakNodeList & weak_nodes) + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { if (!group) { return nullptr; } - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto callback_group = weak_group.lock(); - if (callback_group == group) { - return node; - } - } + + rclcpp::CallbackGroup::WeakPtr weak_group_ptr(group); + const auto finder = weak_groups_to_nodes.find(weak_group_ptr); + if (finder != weak_groups_to_nodes.end()) { + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr = finder->second.lock(); + return node_ptr; } return nullptr; } @@ -151,25 +122,20 @@ MemoryStrategy::get_node_by_group( rclcpp::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_subscription( rclcpp::SubscriptionBase::SharedPtr subscription, - const WeakNodeList & weak_nodes) + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (!group || !node) { continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - auto match_sub = group->find_subscription_ptrs_if( - [&subscription](const rclcpp::SubscriptionBase::SharedPtr & sub) -> bool { - return sub == subscription; - }); - if (match_sub) { - return group; - } + auto match_sub = group->find_subscription_ptrs_if( + [&subscription](const rclcpp::SubscriptionBase::SharedPtr & sub) -> bool { + return sub == subscription; + }); + if (match_sub) { + return group; } } return nullptr; @@ -178,25 +144,20 @@ MemoryStrategy::get_group_by_subscription( rclcpp::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_service( rclcpp::ServiceBase::SharedPtr service, - const WeakNodeList & weak_nodes) + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (!group || !node) { continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - auto service_ref = group->find_service_ptrs_if( - [&service](const rclcpp::ServiceBase::SharedPtr & serv) -> bool { - return serv == service; - }); - if (service_ref) { - return group; - } + auto service_ref = group->find_service_ptrs_if( + [&service](const rclcpp::ServiceBase::SharedPtr & serv) -> bool { + return serv == service; + }); + if (service_ref) { + return group; } } return nullptr; @@ -205,25 +166,20 @@ MemoryStrategy::get_group_by_service( rclcpp::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_client( rclcpp::ClientBase::SharedPtr client, - const WeakNodeList & weak_nodes) + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (!group || !node) { continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - auto client_ref = group->find_client_ptrs_if( - [&client](const rclcpp::ClientBase::SharedPtr & cli) -> bool { - return cli == client; - }); - if (client_ref) { - return group; - } + auto client_ref = group->find_client_ptrs_if( + [&client](const rclcpp::ClientBase::SharedPtr & cli) -> bool { + return cli == client; + }); + if (client_ref) { + return group; } } return nullptr; @@ -232,25 +188,20 @@ MemoryStrategy::get_group_by_client( rclcpp::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_timer( rclcpp::TimerBase::SharedPtr timer, - const WeakNodeList & weak_nodes) + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (!group || !node) { continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - auto timer_ref = group->find_timer_ptrs_if( - [&timer](const rclcpp::TimerBase::SharedPtr & time) -> bool { - return time == timer; - }); - if (timer_ref) { - return group; - } + auto timer_ref = group->find_timer_ptrs_if( + [&timer](const rclcpp::TimerBase::SharedPtr & time) -> bool { + return time == timer; + }); + if (timer_ref) { + return group; } } return nullptr; @@ -259,25 +210,20 @@ MemoryStrategy::get_group_by_timer( rclcpp::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_waitable( rclcpp::Waitable::SharedPtr waitable, - const WeakNodeList & weak_nodes) + const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { + for (const auto & pair : weak_groups_to_nodes) { + auto group = pair.first.lock(); + auto node = pair.second.lock(); + if (!group || !node) { continue; } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - auto waitable_ref = group->find_waitable_ptrs_if( - [&waitable](const rclcpp::Waitable::SharedPtr & group_waitable) -> bool { - return group_waitable == waitable; - }); - if (waitable_ref) { - return group; - } + auto waitable_ref = group->find_waitable_ptrs_if( + [&waitable](const rclcpp::Waitable::SharedPtr & group_waitable) -> bool { + return group_waitable == waitable; + }); + if (waitable_ref) { + return group; } } return nullptr; diff --git a/rclcpp/src/rclcpp/node.cpp b/rclcpp/src/rclcpp/node.cpp index b69d7c2baf..c61340a7cd 100644 --- a/rclcpp/src/rclcpp/node.cpp +++ b/rclcpp/src/rclcpp/node.cpp @@ -211,9 +211,11 @@ Node::get_logger() const } rclcpp::CallbackGroup::SharedPtr -Node::create_callback_group(rclcpp::CallbackGroupType group_type) +Node::create_callback_group( + rclcpp::CallbackGroupType group_type, + bool automatically_add_to_executor_with_node) { - return node_base_->create_callback_group(group_type); + return node_base_->create_callback_group(group_type, automatically_add_to_executor_with_node); } bool diff --git a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp index d8c13bd826..05779331fb 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp @@ -216,11 +216,16 @@ NodeBase::get_shared_rcl_node_handle() const } rclcpp::CallbackGroup::SharedPtr -NodeBase::create_callback_group(rclcpp::CallbackGroupType group_type) +NodeBase::create_callback_group( + rclcpp::CallbackGroupType group_type, + bool automatically_add_to_executor_with_node) { using rclcpp::CallbackGroup; using rclcpp::CallbackGroupType; - auto group = CallbackGroup::SharedPtr(new CallbackGroup(group_type)); + auto group = CallbackGroup::SharedPtr( + new CallbackGroup( + group_type, + automatically_add_to_executor_with_node)); callback_groups_.push_back(group); return group; } diff --git a/rclcpp/test/CMakeLists.txt b/rclcpp/test/CMakeLists.txt index e9e70f8063..12812e6999 100644 --- a/rclcpp/test/CMakeLists.txt +++ b/rclcpp/test/CMakeLists.txt @@ -82,6 +82,13 @@ if(TARGET test_create_subscription) "test_msgs" ) endif() +ament_add_gtest(test_add_callback_groups_to_executor rclcpp/test_add_callback_groups_to_executor.cpp) +if(TARGET test_add_callback_groups_to_executor) + target_link_libraries(test_add_callback_groups_to_executor ${PROJECT_NAME}) + ament_target_dependencies(test_add_callback_groups_to_executor + "test_msgs" + ) +endif() ament_add_gtest(test_expand_topic_or_service_name rclcpp/test_expand_topic_or_service_name.cpp) if(TARGET test_expand_topic_or_service_name) ament_target_dependencies(test_expand_topic_or_service_name diff --git a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp index 70470ec17a..e330d92986 100644 --- a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -26,7 +27,9 @@ #include "test_msgs/srv/empty.hpp" using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy; -using WeakNodeList = std::list; +typedef std::map> WeakCallbackGroupsToNodesMap; static bool test_waitable_result = false; @@ -187,10 +190,19 @@ class TestAllocatorMemoryStrategy : public ::testing::Test std::shared_ptr node, const RclWaitSetSizes & expected) { - WeakNodeList nodes; - nodes.push_back(node->get_node_base_interface()); - - allocator_memory_strategy()->collect_entities(nodes); + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, + &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); EXPECT_EQ( expected.size_of_subscriptions, allocator_memory_strategy()->number_of_ready_subscriptions()); EXPECT_EQ( @@ -211,9 +223,19 @@ class TestAllocatorMemoryStrategy : public ::testing::Test std::shared_ptr node, const RclWaitSetSizes & insufficient_capacities) { - WeakNodeList nodes; - nodes.push_back(node->get_node_base_interface()); - allocator_memory_strategy()->collect_entities(nodes); + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, + &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); auto context = node->get_node_base_interface()->get_context(); rcl_context_t * rcl_context = context->get_rcl_context().get(); @@ -280,26 +302,65 @@ class TestAllocatorMemoryStrategy : public ::testing::Test ::testing::AssertionResult TestGetNextEntity( std::shared_ptr node_with_entity1, std::shared_ptr node_with_entity2, - std::function get_next_entity_func) + std::function get_next_entity_func) { - WeakNodeList nodes; - auto basic_node = std::make_shared("basic_node", "ns"); - nodes.push_back(node_with_entity1->get_node_base_interface()); - nodes.push_back(basic_node->get_node_base_interface()); - allocator_memory_strategy()->collect_entities(nodes); - - rclcpp::AnyExecutable result = get_next_entity_func(nodes); + auto basic_node = create_node_with_disabled_callback_groups("basic_node"); + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + auto callback_groups = basic_node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, + &basic_node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + basic_node->get_node_base_interface())); + }); + auto callback_groups1 = node_with_entity1->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups1.begin(), callback_groups1.end(), + [&weak_groups_to_nodes, + &node_with_entity1](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node_with_entity1->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); + + rclcpp::AnyExecutable result = get_next_entity_func(weak_groups_to_nodes); if (result.node_base != node_with_entity1->get_node_base_interface()) { return ::testing::AssertionFailure() << "Failed to get expected entity with specified get_next_*() function"; } - WeakNodeList uncollected_nodes; auto basic_node2 = std::make_shared("basic_node2", "ns"); - uncollected_nodes.push_back(node_with_entity2->get_node_base_interface()); - uncollected_nodes.push_back(basic_node2->get_node_base_interface()); - - rclcpp::AnyExecutable failed_result = get_next_entity_func(uncollected_nodes); + WeakCallbackGroupsToNodesMap weak_groups_to_uncollected_nodes; + auto callback_groups2 = basic_node2->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups2.begin(), callback_groups2.end(), + [&weak_groups_to_uncollected_nodes, + &basic_node2](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_uncollected_nodes.insert( + std::pair( + weak_group_ptr, + basic_node2->get_node_base_interface())); + }); + auto callback_groups3 = node_with_entity2->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups3.begin(), callback_groups3.end(), + [&weak_groups_to_uncollected_nodes, + &node_with_entity2](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_uncollected_nodes.insert( + std::pair( + weak_group_ptr, + node_with_entity2->get_node_base_interface())); + }); + rclcpp::AnyExecutable failed_result = get_next_entity_func(weak_groups_to_uncollected_nodes); if (nullptr != failed_result.node_base) { return ::testing::AssertionFailure() << "A node was retrieved with the specified get_next_*() function despite" @@ -311,16 +372,33 @@ class TestAllocatorMemoryStrategy : public ::testing::Test ::testing::AssertionResult TestGetNextEntityMutuallyExclusive( std::shared_ptr node_with_entity, - std::function get_next_entity_func) + std::function get_next_entity_func) { - WeakNodeList nodes; auto basic_node = std::make_shared("basic_node", "ns"); - - auto node_base = node_with_entity->get_node_base_interface(); auto basic_node_base = basic_node->get_node_base_interface(); - nodes.push_back(node_base); - nodes.push_back(basic_node_base); - allocator_memory_strategy()->collect_entities(nodes); + auto node_base = node_with_entity->get_node_base_interface(); + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + auto callback_groups = basic_node_base->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &basic_node_base](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + basic_node_base)); + }); + callback_groups = node_base->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node_base](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node_base)); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); // It's important that these be set after collect_entities() otherwise collect_entities() will // not do anything @@ -330,7 +408,7 @@ class TestAllocatorMemoryStrategy : public ::testing::Test callback_group->can_be_taken_from() = false; } - rclcpp::AnyExecutable result = get_next_entity_func(nodes); + rclcpp::AnyExecutable result = get_next_entity_func(weak_groups_to_nodes); if (nullptr != result.node_base) { return ::testing::AssertionFailure() << @@ -357,10 +435,19 @@ class TestAllocatorMemoryStrategy : public ::testing::Test }; TEST_F(TestAllocatorMemoryStrategy, construct_destruct) { - WeakNodeList nodes; auto basic_node = create_node_with_disabled_callback_groups("basic_node"); - nodes.push_back(basic_node->get_node_base_interface()); - allocator_memory_strategy()->collect_entities(nodes); + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + auto callback_groups = basic_node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &basic_node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + basic_node->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); EXPECT_EQ(0u, allocator_memory_strategy()->number_of_ready_subscriptions()); EXPECT_EQ(0u, allocator_memory_strategy()->number_of_ready_services()); EXPECT_EQ(0u, allocator_memory_strategy()->number_of_ready_events()); @@ -448,9 +535,18 @@ TEST_F(TestAllocatorMemoryStrategy, number_of_entities_with_timer) { TEST_F(TestAllocatorMemoryStrategy, add_handles_to_wait_set_bad_arguments) { auto node = create_node_with_subscription("subscription_node"); - WeakNodeList nodes; - nodes.push_back(node->get_node_base_interface()); - allocator_memory_strategy()->collect_entities(nodes); + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); EXPECT_FALSE(allocator_memory_strategy()->add_handles_to_wait_set(nullptr)); EXPECT_TRUE(rcl_error_is_set()); rcl_reset_error(); @@ -527,9 +623,9 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_subscription) { auto node1 = create_node_with_subscription("node1"); auto node2 = create_node_with_subscription("node2"); - auto get_next_entity = [this](const WeakNodeList & nodes) { + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_subscription(result, nodes); + allocator_memory_strategy()->get_next_subscription(result, weak_groups_to_nodes); return result; }; @@ -540,9 +636,9 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_service) { auto node1 = create_node_with_service("node1"); auto node2 = create_node_with_service("node2"); - auto get_next_entity = [this](const WeakNodeList & nodes) { + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_service(result, nodes); + allocator_memory_strategy()->get_next_service(result, weak_groups_to_nodes); return result; }; @@ -553,9 +649,9 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_client) { auto node1 = create_node_with_client("node1"); auto node2 = create_node_with_client("node2"); - auto get_next_entity = [this](const WeakNodeList & nodes) { + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_client(result, nodes); + allocator_memory_strategy()->get_next_client(result, weak_groups_to_nodes); return result; }; @@ -566,9 +662,9 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_timer) { auto node1 = create_node_with_timer("node1"); auto node2 = create_node_with_timer("node2"); - auto get_next_entity = [this](const WeakNodeList & nodes) { + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_timer(result, nodes); + allocator_memory_strategy()->get_next_timer(result, weak_groups_to_nodes); return result; }; @@ -583,9 +679,9 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) { node1->get_node_waitables_interface()->add_waitable(waitable1, nullptr); node2->get_node_waitables_interface()->add_waitable(waitable2, nullptr); - auto get_next_entity = [this](const WeakNodeList & nodes) { + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_waitable(result, nodes); + allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes); return result; }; @@ -595,9 +691,9 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_waitable) { TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) { auto node = create_node_with_subscription("node"); - auto get_next_entity = [this](const WeakNodeList & nodes) { + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_subscription(result, nodes); + allocator_memory_strategy()->get_next_subscription(result, weak_groups_to_nodes); return result; }; @@ -607,9 +703,9 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_mutually_exclusive) { TEST_F(TestAllocatorMemoryStrategy, get_next_service_mutually_exclusive) { auto node = create_node_with_service("node"); - auto get_next_entity = [this](const WeakNodeList & nodes) { + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_service(result, nodes); + allocator_memory_strategy()->get_next_service(result, weak_groups_to_nodes); return result; }; @@ -619,9 +715,9 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_service_mutually_exclusive) { TEST_F(TestAllocatorMemoryStrategy, get_next_client_mutually_exclusive) { auto node = create_node_with_client("node"); - auto get_next_entity = [this](const WeakNodeList & nodes) { + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_client(result, nodes); + allocator_memory_strategy()->get_next_client(result, weak_groups_to_nodes); return result; }; @@ -631,9 +727,9 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_client_mutually_exclusive) { TEST_F(TestAllocatorMemoryStrategy, get_next_timer_mutually_exclusive) { auto node = create_node_with_timer("node"); - auto get_next_entity = [this](const WeakNodeList & nodes) { + auto get_next_entity = [this](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_timer(result, nodes); + allocator_memory_strategy()->get_next_timer(result, weak_groups_to_nodes); return result; }; @@ -647,13 +743,14 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_waitable_mutually_exclusive) { node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); node->get_node_waitables_interface()->add_waitable(waitable, callback_group); - auto get_next_entity = [this, callback_group](const WeakNodeList & nodes) { + auto get_next_entity = + [this, callback_group](const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { // This callback group isn't in the base class' callback_group list, so this needs to be done // before get_next_waitable() is called. callback_group->can_be_taken_from() = false; rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_waitable(result, nodes); + allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes); return result; }; @@ -661,9 +758,8 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_waitable_mutually_exclusive) { } TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_out_of_scope) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; auto node = create_node_with_disabled_callback_groups("node"); - nodes.push_back(node->get_node_base_interface()); // Force subscription to go out of scope and cleanup after collecting entities. { rclcpp::SubscriptionOptions subscription_options; @@ -681,19 +777,28 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_subscription_out_of_scope) { test_msgs::msg::Empty, decltype(subscription_callback)>( "topic", qos, std::move(subscription_callback), subscription_options); - allocator_memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); } EXPECT_EQ(1u, allocator_memory_strategy()->number_of_ready_subscriptions()); rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_subscription(result, nodes); + allocator_memory_strategy()->get_next_subscription(result, weak_groups_to_nodes); EXPECT_EQ(nullptr, result.node_base); } TEST_F(TestAllocatorMemoryStrategy, get_next_service_out_of_scope) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; auto node = create_node_with_disabled_callback_groups("node"); - nodes.push_back(node->get_node_base_interface()); // Force service to go out of scope and cleanup after collecting entities. { auto callback_group = @@ -706,19 +811,38 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_service_out_of_scope) { auto service = node->create_service( "service", std::move(service_callback), rmw_qos_profile_services_default, callback_group); - allocator_memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); } EXPECT_EQ(1u, allocator_memory_strategy()->number_of_ready_services()); rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_service(result, nodes); + allocator_memory_strategy()->get_next_service(result, weak_groups_to_nodes); EXPECT_EQ(nullptr, result.node_base); } TEST_F(TestAllocatorMemoryStrategy, get_next_client_out_of_scope) { - WeakNodeList nodes; auto node = create_node_with_disabled_callback_groups("node"); - nodes.push_back(node->get_node_base_interface()); + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); // Force client to go out of scope and cleanup after collecting entities. { auto callback_group = @@ -727,49 +851,69 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_client_out_of_scope) { auto client = node->create_client( "service", rmw_qos_profile_services_default, callback_group); - allocator_memory_strategy()->collect_entities(nodes); + weak_groups_to_nodes.insert( + std::pair( + rclcpp::CallbackGroup::WeakPtr(callback_group), + node->get_node_base_interface())); + + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); } EXPECT_EQ(1u, allocator_memory_strategy()->number_of_ready_clients()); rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_client(result, nodes); + allocator_memory_strategy()->get_next_client(result, weak_groups_to_nodes); EXPECT_EQ(nullptr, result.node_base); } TEST_F(TestAllocatorMemoryStrategy, get_next_timer_out_of_scope) { - WeakNodeList nodes; auto node = create_node_with_disabled_callback_groups("node"); - nodes.push_back(node->get_node_base_interface()); - + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; // Force timer to go out of scope and cleanup after collecting entities. { auto callback_group = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive); - auto timer = node->create_wall_timer( std::chrono::seconds(10), []() {}, callback_group); - - allocator_memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); } EXPECT_EQ(1u, allocator_memory_strategy()->number_of_ready_timers()); rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_timer(result, nodes); + allocator_memory_strategy()->get_next_timer(result, weak_groups_to_nodes); EXPECT_EQ(nullptr, result.node_base); } TEST_F(TestAllocatorMemoryStrategy, get_next_waitable_out_of_scope) { - WeakNodeList nodes; auto node = create_node_with_disabled_callback_groups("node"); - nodes.push_back(node->get_node_base_interface()); - + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; // Force waitable to go out of scope and cleanup after collecting entities. { auto callback_group = node->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive); - allocator_memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + allocator_memory_strategy()->collect_entities(weak_groups_to_nodes); auto waitable = std::make_shared(); node->get_node_waitables_interface()->add_waitable(waitable, callback_group); allocator_memory_strategy()->add_waitable_handle(waitable); @@ -778,6 +922,6 @@ TEST_F(TestAllocatorMemoryStrategy, get_next_waitable_out_of_scope) { EXPECT_EQ(1u, allocator_memory_strategy()->number_of_waitables()); rclcpp::AnyExecutable result; - allocator_memory_strategy()->get_next_waitable(result, nodes); + allocator_memory_strategy()->get_next_waitable(result, weak_groups_to_nodes); EXPECT_EQ(nullptr, result.node_base); } diff --git a/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp new file mode 100644 index 0000000000..deea28d915 --- /dev/null +++ b/rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp @@ -0,0 +1,299 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include + +#include "rclcpp/node.hpp" +#include "test_msgs/msg/empty.hpp" +#include "test_msgs/msg/empty.h" + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/executors.hpp" +#include "rclcpp/executor.hpp" +#include "rclcpp/rclcpp.hpp" + +using namespace std::chrono_literals; + +template +class TestAddCallbackGroupsToExecutor : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + static void TearDownTestCase() + { + rclcpp::shutdown(); + } +}; + +using ExecutorTypes = + ::testing::Types< + rclcpp::executors::SingleThreadedExecutor, + rclcpp::executors::MultiThreadedExecutor, + rclcpp::executors::StaticSingleThreadedExecutor>; + +class ExecutorTypeNames +{ +public: + template + static std::string GetName(int idx) + { + (void)idx; + if (std::is_same()) { + return "SingleThreadedExecutor"; + } + + if (std::is_same()) { + return "MultiThreadedExecutor"; + } + + if (std::is_same()) { + return "StaticSingleThreadedExecutor"; + } + + return ""; + } +}; + +TYPED_TEST_CASE(TestAddCallbackGroupsToExecutor, ExecutorTypes, ExecutorTypeNames); + +/* + * Test adding callback groups. + */ +TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups) { + using ExecutorType = TypeParam; + auto node = std::make_shared("my_node", "/ns"); + auto timer_callback = []() {}; + rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( + 2s, timer_callback, cb_grp); + ExecutorType executor; + executor.add_callback_group(cb_grp, node->get_node_base_interface()); + ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); + ASSERT_EQ(executor.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(executor.get_automatically_added_callback_groups_from_nodes().size(), 0u); + + const rclcpp::QoS qos(10); + auto options = rclcpp::SubscriptionOptions(); + auto callback = [](const test_msgs::msg::Empty::SharedPtr) {}; + rclcpp::CallbackGroup::SharedPtr cb_grp2 = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + options.callback_group = cb_grp2; + auto subscription = + node->create_subscription("topic_name", qos, callback, options); + executor.add_callback_group(cb_grp2, node->get_node_base_interface()); + ASSERT_EQ(executor.get_all_callback_groups().size(), 2u); + ASSERT_EQ(executor.get_manually_added_callback_groups().size(), 2u); + ASSERT_EQ(executor.get_automatically_added_callback_groups_from_nodes().size(), 0u); + + executor.add_node(node); + ASSERT_EQ(executor.get_manually_added_callback_groups().size(), 2u); + ASSERT_EQ(executor.get_automatically_added_callback_groups_from_nodes().size(), 1u); + + executor.remove_node(node); + ASSERT_EQ(executor.get_manually_added_callback_groups().size(), 2u); + ASSERT_EQ(executor.get_automatically_added_callback_groups_from_nodes().size(), 0u); + + executor.remove_callback_group(cb_grp); + ASSERT_EQ(executor.get_manually_added_callback_groups().size(), 1u); + ASSERT_EQ(executor.get_automatically_added_callback_groups_from_nodes().size(), 0u); + + executor.remove_callback_group(cb_grp2); + ASSERT_EQ(executor.get_manually_added_callback_groups().size(), 0u); + ASSERT_EQ(executor.get_automatically_added_callback_groups_from_nodes().size(), 0u); +} + +/* + * Test removing callback groups. + */ +TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_groups) { + using ExecutorType = TypeParam; + auto node = std::make_shared("my_node", "/ns"); + auto timer_callback = []() {}; + rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( + 2s, timer_callback, cb_grp); + ExecutorType executor; + executor.add_callback_group(cb_grp, node->get_node_base_interface()); + const rclcpp::QoS qos(10); + auto options = rclcpp::SubscriptionOptions(); + auto callback = [](const test_msgs::msg::Empty::SharedPtr) {}; + rclcpp::CallbackGroup::SharedPtr cb_grp2 = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + options.callback_group = cb_grp2; + auto subscription = + node->create_subscription("topic_name", qos, callback, options); + executor.add_callback_group(cb_grp2, node->get_node_base_interface()); + + executor.remove_callback_group(cb_grp); + ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); + executor.remove_callback_group(cb_grp2); + ASSERT_EQ(executor.get_all_callback_groups().size(), 0u); +} + +/* + * Test adding duplicate callback groups to executor. + */ +TYPED_TEST(TestAddCallbackGroupsToExecutor, add_duplicate_callback_groups) +{ + rclcpp::executors::MultiThreadedExecutor executor; + auto node = std::make_shared("my_node", "/ns"); + auto timer_callback = []() {}; + rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( + 2s, timer_callback, cb_grp); + executor.add_callback_group(cb_grp, node->get_node_base_interface()); + EXPECT_THROW( + executor.add_callback_group(cb_grp, node->get_node_base_interface()), + std::exception); +} + +/* + * Test adding callback group after node is added to executor. + */ +TYPED_TEST(TestAddCallbackGroupsToExecutor, add_callback_groups_after_add_node_to_executor) +{ + rclcpp::executors::MultiThreadedExecutor executor; + auto node = std::make_shared("my_node", "/ns"); + executor.add_node(node->get_node_base_interface()); + ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); + std::atomic_int timer_count {0}; + auto timer_callback = [&executor, &timer_count]() { + if (timer_count > 0) { + ASSERT_EQ(executor.get_all_callback_groups().size(), 3u); + executor.cancel(); + } + timer_count++; + }; + rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( + 2s, timer_callback, cb_grp); + rclcpp::CallbackGroup::SharedPtr cb_grp2 = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, false); + auto timer2_callback = []() {}; + rclcpp::TimerBase::SharedPtr timer2_ = node->create_wall_timer( + 2s, timer2_callback, cb_grp2); + rclcpp::CallbackGroup::SharedPtr cb_grp3 = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, true); + auto timer3_callback = []() {}; + rclcpp::TimerBase::SharedPtr timer3_ = node->create_wall_timer( + 2s, timer3_callback, cb_grp3); + executor.spin(); +} + +/* + * Test adding unallowable callback group. + */ +TYPED_TEST(TestAddCallbackGroupsToExecutor, add_unallowable_callback_groups) +{ + auto node = std::make_shared("my_node", "/ns"); + auto timer_callback = []() {}; + rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, false); + rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( + 2s, timer_callback, cb_grp); + rclcpp::executors::MultiThreadedExecutor executor; + executor.add_callback_group(cb_grp, node->get_node_base_interface()); + ASSERT_EQ(executor.get_all_callback_groups().size(), 1u); + + const rclcpp::QoS qos(10); + auto options = rclcpp::SubscriptionOptions(); + auto callback = [](const test_msgs::msg::Empty::SharedPtr) {}; + rclcpp::CallbackGroup::SharedPtr cb_grp2 = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, false); + options.callback_group = cb_grp2; + auto subscription = + node->create_subscription("topic_name", qos, callback, options); + executor.add_callback_group(cb_grp2, node->get_node_base_interface()); + ASSERT_EQ(executor.get_all_callback_groups().size(), 2u); + + auto timer2_callback = []() {}; + rclcpp::CallbackGroup::SharedPtr cb_grp3 = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, false); + rclcpp::TimerBase::SharedPtr timer2_ = node->create_wall_timer( + 2s, timer2_callback, cb_grp3); + executor.add_node(node->get_node_base_interface()); + ASSERT_EQ(executor.get_all_callback_groups().size(), 3u); +} + +/* + * Test callback groups from one node to many executors. + */ +TYPED_TEST(TestAddCallbackGroupsToExecutor, one_node_many_callback_groups_many_executors) +{ + auto node = std::make_shared("my_node", "/ns"); + auto timer_callback = []() {}; + rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, false); + rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( + 2s, timer_callback, cb_grp); + rclcpp::executors::MultiThreadedExecutor timer_executor; + rclcpp::executors::MultiThreadedExecutor sub_executor; + timer_executor.add_callback_group(cb_grp, node->get_node_base_interface()); + const rclcpp::QoS qos(10); + auto options = rclcpp::SubscriptionOptions(); + auto callback = [](const test_msgs::msg::Empty::SharedPtr) {}; + rclcpp::CallbackGroup::SharedPtr cb_grp2 = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, false); + options.callback_group = cb_grp2; + auto subscription = + node->create_subscription("topic_name", qos, callback, options); + sub_executor.add_callback_group(cb_grp2, node->get_node_base_interface()); + ASSERT_EQ(sub_executor.get_all_callback_groups().size(), 1u); + ASSERT_EQ(timer_executor.get_all_callback_groups().size(), 1u); + auto timer2_callback = []() {}; + rclcpp::CallbackGroup::SharedPtr cb_grp3 = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive, false); + rclcpp::TimerBase::SharedPtr timer2 = node->create_wall_timer( + 2s, timer2_callback, cb_grp3); + sub_executor.add_node(node); + ASSERT_EQ(sub_executor.get_all_callback_groups().size(), 2u); + timer_executor.add_callback_group(cb_grp3, node->get_node_base_interface()); + ASSERT_EQ(timer_executor.get_all_callback_groups().size(), 2u); +} + +/* + * Test removing callback group from executor that its not associated with. + */ +TYPED_TEST(TestAddCallbackGroupsToExecutor, remove_callback_group) +{ + rclcpp::executors::MultiThreadedExecutor executor; + auto node = std::make_shared("my_node", "/ns"); + auto timer_callback = []() {}; + rclcpp::CallbackGroup::SharedPtr cb_grp = node->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + rclcpp::TimerBase::SharedPtr timer_ = node->create_wall_timer( + 2s, timer_callback, cb_grp); + EXPECT_THROW( + executor.remove_callback_group(cb_grp), + std::exception); + executor.add_callback_group(cb_grp, node->get_node_base_interface()); + EXPECT_NO_THROW(executor.remove_callback_group(cb_grp)); + EXPECT_THROW( + executor.remove_callback_group(cb_grp), + std::exception); +} diff --git a/rclcpp/test/rclcpp/test_find_weak_nodes.cpp b/rclcpp/test/rclcpp/test_find_weak_nodes.cpp index f4e58e8d19..d402f02a11 100644 --- a/rclcpp/test/rclcpp/test_find_weak_nodes.cpp +++ b/rclcpp/test/rclcpp/test_find_weak_nodes.cpp @@ -36,22 +36,24 @@ TEST_F(TestFindWeakNodes, allocator_strategy_with_weak_nodes) { rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>(); auto existing_node = rclcpp::Node::make_shared("existing_node"); auto dead_node = rclcpp::Node::make_shared("dead_node"); - rclcpp::memory_strategy::MemoryStrategy::WeakNodeList weak_nodes; - weak_nodes.push_back(existing_node->get_node_base_interface()); - weak_nodes.push_back(dead_node->get_node_base_interface()); + auto existing_group = existing_node->get_node_base_interface()->get_default_callback_group(); + auto dead_group = dead_node->get_node_base_interface()->get_default_callback_group(); + rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + weak_groups_to_nodes[existing_group] = existing_node->get_node_base_interface(); + weak_groups_to_nodes[dead_group] = dead_node->get_node_base_interface(); // AND // Delete dead_node, creating a dangling pointer in weak_nodes dead_node.reset(); - ASSERT_FALSE(weak_nodes.front().expired()); - ASSERT_TRUE(weak_nodes.back().expired()); + ASSERT_FALSE(weak_groups_to_nodes[existing_group].expired()); + ASSERT_TRUE(weak_groups_to_nodes[dead_group].expired()); // WHEN - bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes); + bool has_invalid_weak_groups_or_nodes = memory_strategy->collect_entities(weak_groups_to_nodes); // THEN // The result of finding dangling node pointers should be true - ASSERT_TRUE(has_invalid_weak_nodes); + ASSERT_TRUE(has_invalid_weak_groups_or_nodes); // Prevent memory leak due to the order of destruction memory_strategy->clear_handles(); @@ -64,18 +66,20 @@ TEST_F(TestFindWeakNodes, allocator_strategy_no_weak_nodes) { rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>(); auto existing_node1 = rclcpp::Node::make_shared("existing_node1"); auto existing_node2 = rclcpp::Node::make_shared("existing_node2"); - rclcpp::memory_strategy::MemoryStrategy::WeakNodeList weak_nodes; - weak_nodes.push_back(existing_node1->get_node_base_interface()); - weak_nodes.push_back(existing_node2->get_node_base_interface()); - ASSERT_FALSE(weak_nodes.front().expired()); - ASSERT_FALSE(weak_nodes.back().expired()); + auto existing_group1 = existing_node1->get_node_base_interface()->get_default_callback_group(); + auto existing_group2 = existing_node2->get_node_base_interface()->get_default_callback_group(); + rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap weak_groups_to_nodes; + weak_groups_to_nodes[existing_group1] = existing_node1->get_node_base_interface(); + weak_groups_to_nodes[existing_group2] = existing_node2->get_node_base_interface(); + ASSERT_FALSE(weak_groups_to_nodes[existing_group1].expired()); + ASSERT_FALSE(weak_groups_to_nodes[existing_group2].expired()); // WHEN - bool has_invalid_weak_nodes = memory_strategy->collect_entities(weak_nodes); + bool has_invalid_weak_groups_or_nodes = memory_strategy->collect_entities(weak_groups_to_nodes); // THEN // The result of finding dangling node pointers should be false - ASSERT_FALSE(has_invalid_weak_nodes); + ASSERT_FALSE(has_invalid_weak_groups_or_nodes); // Prevent memory leak due to the order of destruction memory_strategy->clear_handles(); diff --git a/rclcpp/test/rclcpp/test_memory_strategy.cpp b/rclcpp/test/rclcpp/test_memory_strategy.cpp index f076094643..a96a49e67c 100644 --- a/rclcpp/test/rclcpp/test_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/test_memory_strategy.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -24,7 +25,9 @@ #include "test_msgs/srv/empty.hpp" using rclcpp::memory_strategy::MemoryStrategy; -using WeakNodeList = std::list; +typedef std::map> WeakCallbackGroupsToNodesMap; /** * Mock Waitable class @@ -75,13 +78,22 @@ TEST_F(TestMemoryStrategy, construct_destruct) { } TEST_F(TestMemoryStrategy, get_subscription_by_handle) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; std::shared_ptr subscription_handle; rclcpp::SubscriptionBase::SharedPtr found_subscription = nullptr; { auto node = std::make_shared("node", "ns"); - nodes.push_back(node->get_node_base_interface()); - memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + memory_strategy()->collect_entities(weak_groups_to_nodes); { auto callback_group = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); @@ -97,29 +109,38 @@ TEST_F(TestMemoryStrategy, get_subscription_by_handle) { EXPECT_EQ( subscription, - memory_strategy()->get_subscription_by_handle(subscription_handle, nodes)); + memory_strategy()->get_subscription_by_handle(subscription_handle, weak_groups_to_nodes)); } // subscription goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_subscription_by_handle(subscription_handle, nodes)); + memory_strategy()->get_subscription_by_handle(subscription_handle, weak_groups_to_nodes)); } // callback_group goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_subscription_by_handle(subscription_handle, nodes)); + memory_strategy()->get_subscription_by_handle(subscription_handle, weak_groups_to_nodes)); } // Node goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_subscription_by_handle(subscription_handle, nodes)); + memory_strategy()->get_subscription_by_handle(subscription_handle, weak_groups_to_nodes)); } TEST_F(TestMemoryStrategy, get_service_by_handle) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; std::shared_ptr service_handle; rclcpp::ServiceBase::SharedPtr found_service = nullptr; { auto node = std::make_shared("node", "ns"); - nodes.push_back(node->get_node_base_interface()); - memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + memory_strategy()->collect_entities(weak_groups_to_nodes); { auto callback_group = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); @@ -127,7 +148,11 @@ TEST_F(TestMemoryStrategy, get_service_by_handle) { [](const test_msgs::srv::Empty::Request::SharedPtr, test_msgs::srv::Empty::Response::SharedPtr) {}; const rclcpp::QoS qos(10); - + weak_groups_to_nodes.insert( + std::pair( + rclcpp::CallbackGroup::WeakPtr(callback_group), + node->get_node_base_interface())); { auto service = node->create_service( "service", std::move(service_callback), @@ -137,29 +162,38 @@ TEST_F(TestMemoryStrategy, get_service_by_handle) { EXPECT_EQ( service, - memory_strategy()->get_service_by_handle(service_handle, nodes)); + memory_strategy()->get_service_by_handle(service_handle, weak_groups_to_nodes)); } // service goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_service_by_handle(service_handle, nodes)); + memory_strategy()->get_service_by_handle(service_handle, weak_groups_to_nodes)); } // callback_group goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_service_by_handle(service_handle, nodes)); + memory_strategy()->get_service_by_handle(service_handle, weak_groups_to_nodes)); } // Node goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_service_by_handle(service_handle, nodes)); + memory_strategy()->get_service_by_handle(service_handle, weak_groups_to_nodes)); } TEST_F(TestMemoryStrategy, get_client_by_handle) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; std::shared_ptr client_handle; rclcpp::ClientBase::SharedPtr found_client = nullptr; { auto node = std::make_shared("node", "ns"); - nodes.push_back(node->get_node_base_interface()); - memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + memory_strategy()->collect_entities(weak_groups_to_nodes); { auto callback_group = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); @@ -168,32 +202,46 @@ TEST_F(TestMemoryStrategy, get_client_by_handle) { "service", rmw_qos_profile_services_default, callback_group); client_handle = client->get_client_handle(); + weak_groups_to_nodes.insert( + std::pair( + rclcpp::CallbackGroup::WeakPtr(callback_group), + node->get_node_base_interface())); EXPECT_EQ( client, - memory_strategy()->get_client_by_handle(client_handle, nodes)); + memory_strategy()->get_client_by_handle(client_handle, weak_groups_to_nodes)); } // client goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_client_by_handle(client_handle, nodes)); + memory_strategy()->get_client_by_handle(client_handle, weak_groups_to_nodes)); } // callback_group goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_client_by_handle(client_handle, nodes)); + memory_strategy()->get_client_by_handle(client_handle, weak_groups_to_nodes)); } // Node goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_client_by_handle(client_handle, nodes)); + memory_strategy()->get_client_by_handle(client_handle, weak_groups_to_nodes)); } TEST_F(TestMemoryStrategy, get_timer_by_handle) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; std::shared_ptr timer_handle; rclcpp::TimerBase::SharedPtr found_timer = nullptr; { auto node = std::make_shared("node", "ns"); - nodes.push_back(node->get_node_base_interface()); - memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + memory_strategy()->collect_entities(weak_groups_to_nodes); { auto callback_group = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); @@ -201,58 +249,85 @@ TEST_F(TestMemoryStrategy, get_timer_by_handle) { auto timer_callback = []() {}; auto timer = node->create_wall_timer( std::chrono::milliseconds(1), timer_callback, callback_group); + weak_groups_to_nodes.insert( + std::pair( + rclcpp::CallbackGroup::WeakPtr(callback_group), + node->get_node_base_interface())); timer_handle = timer->get_timer_handle(); EXPECT_EQ( timer, - memory_strategy()->get_timer_by_handle(timer_handle, nodes)); + memory_strategy()->get_timer_by_handle(timer_handle, weak_groups_to_nodes)); } // timer goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_timer_by_handle(timer_handle, nodes)); + memory_strategy()->get_timer_by_handle(timer_handle, weak_groups_to_nodes)); } // callback_group goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_timer_by_handle(timer_handle, nodes)); + memory_strategy()->get_timer_by_handle(timer_handle, weak_groups_to_nodes)); } // Node goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_timer_by_handle(timer_handle, nodes)); + memory_strategy()->get_timer_by_handle(timer_handle, weak_groups_to_nodes)); } TEST_F(TestMemoryStrategy, get_node_by_group) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; rclcpp::CallbackGroup::SharedPtr callback_group = nullptr; { auto node = std::make_shared("node", "ns"); auto node_handle = node->get_node_base_interface(); - nodes.push_back(node_handle); - memory_strategy()->collect_entities(nodes); + auto callback_groups = node_handle->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node_handle](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node_handle)); + }); + memory_strategy()->collect_entities(weak_groups_to_nodes); EXPECT_EQ( nullptr, - memory_strategy()->get_node_by_group(nullptr, nodes)); + memory_strategy()->get_node_by_group(nullptr, weak_groups_to_nodes)); callback_group = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); - + weak_groups_to_nodes.insert( + std::pair( + rclcpp::CallbackGroup::WeakPtr(callback_group), + node->get_node_base_interface())); EXPECT_EQ( node_handle, - memory_strategy()->get_node_by_group(callback_group, nodes)); + memory_strategy()->get_node_by_group(callback_group, weak_groups_to_nodes)); } // Node goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_node_by_group(callback_group, nodes)); + memory_strategy()->get_node_by_group(callback_group, weak_groups_to_nodes)); } TEST_F(TestMemoryStrategy, get_group_by_subscription) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; rclcpp::SubscriptionBase::SharedPtr subscription = nullptr; rclcpp::CallbackGroup::SharedPtr callback_group = nullptr; { auto node = std::make_shared("node", "ns"); - nodes.push_back(node->get_node_base_interface()); - memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + memory_strategy()->collect_entities(weak_groups_to_nodes); { // This group is just used to test that a callback group that is held as a weak pointer // by node, doesn't confuse get_group_by_subscription() when it goes out of scope @@ -273,27 +348,40 @@ TEST_F(TestMemoryStrategy, get_group_by_subscription) { subscription = node->create_subscription< test_msgs::msg::Empty, decltype(subscription_callback)>( "topic", qos, std::move(subscription_callback), subscription_options); - + weak_groups_to_nodes.insert( + std::pair( + rclcpp::CallbackGroup::WeakPtr(callback_group), + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr(node->get_node_base_interface()))); EXPECT_EQ( callback_group, - memory_strategy()->get_group_by_subscription(subscription, nodes)); + memory_strategy()->get_group_by_subscription(subscription, weak_groups_to_nodes)); } // callback_group goes out of scope EXPECT_EQ( callback_group, - memory_strategy()->get_group_by_subscription(subscription, nodes)); + memory_strategy()->get_group_by_subscription(subscription, weak_groups_to_nodes)); } // Node goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_group_by_subscription(subscription, nodes)); + memory_strategy()->get_group_by_subscription(subscription, weak_groups_to_nodes)); } TEST_F(TestMemoryStrategy, get_group_by_service) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; rclcpp::ServiceBase::SharedPtr service = nullptr; { auto node = std::make_shared("node", "ns"); - nodes.push_back(node->get_node_base_interface()); - memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + memory_strategy()->collect_entities(weak_groups_to_nodes); { auto callback_group = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); @@ -305,96 +393,139 @@ TEST_F(TestMemoryStrategy, get_group_by_service) { service = node->create_service( "service", std::move(service_callback), rmw_qos_profile_services_default, callback_group); - + weak_groups_to_nodes.insert( + std::pair( + rclcpp::CallbackGroup::WeakPtr(callback_group), + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr(node->get_node_base_interface()))); EXPECT_EQ( callback_group, - memory_strategy()->get_group_by_service(service, nodes)); + memory_strategy()->get_group_by_service(service, weak_groups_to_nodes)); } // callback_group goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_group_by_service(service, nodes)); + memory_strategy()->get_group_by_service(service, weak_groups_to_nodes)); } // Node goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_group_by_service(service, nodes)); + memory_strategy()->get_group_by_service(service, weak_groups_to_nodes)); } TEST_F(TestMemoryStrategy, get_group_by_client) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; rclcpp::ClientBase::SharedPtr client = nullptr; { auto node = std::make_shared("node", "ns"); - nodes.push_back(node->get_node_base_interface()); - memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + memory_strategy()->collect_entities(weak_groups_to_nodes); { auto callback_group = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); client = node->create_client( "service", rmw_qos_profile_services_default, callback_group); - + weak_groups_to_nodes.insert( + std::pair( + rclcpp::CallbackGroup::WeakPtr(callback_group), + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr(node->get_node_base_interface()))); EXPECT_EQ( callback_group, - memory_strategy()->get_group_by_client(client, nodes)); + memory_strategy()->get_group_by_client(client, weak_groups_to_nodes)); } // callback_group goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_group_by_client(client, nodes)); + memory_strategy()->get_group_by_client(client, weak_groups_to_nodes)); } // Node goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_group_by_client(client, nodes)); + memory_strategy()->get_group_by_client(client, weak_groups_to_nodes)); } TEST_F(TestMemoryStrategy, get_group_by_timer) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; rclcpp::TimerBase::SharedPtr timer = nullptr; { auto node = std::make_shared("node", "ns"); - nodes.push_back(node->get_node_base_interface()); - memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + memory_strategy()->collect_entities(weak_groups_to_nodes); { auto callback_group = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); auto timer_callback = []() {}; timer = node->create_wall_timer( std::chrono::milliseconds(1), timer_callback, callback_group); - + weak_groups_to_nodes.insert( + std::pair( + rclcpp::CallbackGroup::WeakPtr(callback_group), + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr(node->get_node_base_interface()))); EXPECT_EQ( callback_group, - memory_strategy()->get_group_by_timer(timer, nodes)); + memory_strategy()->get_group_by_timer(timer, weak_groups_to_nodes)); } // callback_group goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_group_by_timer(timer, nodes)); + memory_strategy()->get_group_by_timer(timer, weak_groups_to_nodes)); } // Node goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_group_by_timer(timer, nodes)); + memory_strategy()->get_group_by_timer(timer, weak_groups_to_nodes)); } TEST_F(TestMemoryStrategy, get_group_by_waitable) { - WeakNodeList nodes; + WeakCallbackGroupsToNodesMap weak_groups_to_nodes; rclcpp::Waitable::SharedPtr waitable = nullptr; { auto node = std::make_shared("node", "ns"); - nodes.push_back(node->get_node_base_interface()); - memory_strategy()->collect_entities(nodes); + auto callback_groups = node->get_node_base_interface()->get_callback_groups(); + std::for_each( + callback_groups.begin(), callback_groups.end(), + [&weak_groups_to_nodes, &node](rclcpp::CallbackGroup::WeakPtr weak_group_ptr) { + weak_groups_to_nodes.insert( + std::pair( + weak_group_ptr, + node->get_node_base_interface())); + }); + memory_strategy()->collect_entities(weak_groups_to_nodes); { waitable = std::make_shared(); auto callback_group = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); node->get_node_waitables_interface()->add_waitable(waitable, callback_group); - + weak_groups_to_nodes.insert( + std::pair( + rclcpp::CallbackGroup::WeakPtr(callback_group), + rclcpp::node_interfaces::NodeBaseInterface::WeakPtr(node->get_node_base_interface()))); EXPECT_EQ( callback_group, - memory_strategy()->get_group_by_waitable(waitable, nodes)); + memory_strategy()->get_group_by_waitable(waitable, weak_groups_to_nodes)); } // callback_group goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_group_by_waitable(waitable, nodes)); + memory_strategy()->get_group_by_waitable(waitable, weak_groups_to_nodes)); } // Node goes out of scope EXPECT_EQ( nullptr, - memory_strategy()->get_group_by_waitable(waitable, nodes)); + memory_strategy()->get_group_by_waitable(waitable, weak_groups_to_nodes)); }