diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index ffae9cc7ee..d5eeb61046 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -74,7 +74,6 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/experimental/timers_manager.cpp src/rclcpp/future_return_code.cpp src/rclcpp/generic_publisher.cpp - src/rclcpp/generic_subscription.cpp src/rclcpp/graph_listener.cpp src/rclcpp/guard_condition.cpp src/rclcpp/init_options.cpp diff --git a/rclcpp/include/rclcpp/create_generic_subscription.hpp b/rclcpp/include/rclcpp/create_generic_subscription.hpp index c2549721b5..56b009e898 100644 --- a/rclcpp/include/rclcpp/create_generic_subscription.hpp +++ b/rclcpp/include/rclcpp/create_generic_subscription.hpp @@ -38,7 +38,7 @@ namespace rclcpp * * \param topics_interface NodeTopicsInterface pointer used in parts of the setup. * \param topic_name Topic name - * \param topic_type Topic type + * \param type_support_handle Topic type support * \param qos %QoS settings * \param callback Callback for new messages of serialized form * \param options %Publisher options. @@ -51,7 +51,7 @@ template< std::shared_ptr create_generic_subscription( rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr topics_interface, const std::string & topic_name, - const std::string & topic_type, + const rosidl_message_type_support_t & type_support_handle, const rclcpp::QoS & qos, CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options = ( @@ -59,10 +59,10 @@ std::shared_ptr create_generic_subscription( ) ) { - auto ts_lib = rclcpp::get_typesupport_library( - topic_type, "rosidl_typesupport_cpp"); - auto allocator = options.get_allocator(); + typename GenericSubscription::MessageMemoryStrategyType::SharedPtr msg_mem_strat = ( + GenericSubscription::MessageMemoryStrategyType::create_default() + ); using rclcpp::AnySubscriptionCallback; AnySubscriptionCallback @@ -71,18 +71,56 @@ std::shared_ptr create_generic_subscription( auto subscription = std::make_shared( topics_interface->get_node_base_interface(), - std::move(ts_lib), + type_support_handle, topic_name, - topic_type, qos, any_subscription_callback, - options); + options, + msg_mem_strat); topics_interface->add_subscription(subscription, options.callback_group); return subscription; } +/// Create and return a GenericSubscription. +/** + * The returned pointer will never be empty, but this function can throw various exceptions, for + * instance when the message's package can not be found on the AMENT_PREFIX_PATH. + * + * \param topics_interface NodeTopicsInterface pointer used in parts of the setup. + * \param topic_name Topic name + * \param topic_type Topic type + * \param qos %QoS settings + * \param callback Callback for new messages of serialized form + * \param options %Publisher options. + * Not all publisher options are currently respected, the only relevant options for this + * publisher are `event_callbacks`, `use_default_callbacks`, and `%callback_group`. + */ +template< + typename CallbackT, + typename AllocatorT = std::allocator> +std::shared_ptr create_generic_subscription( + rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr topics_interface, + const std::string & topic_name, + const std::string & topic_type, + const rclcpp::QoS & qos, + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options = ( + rclcpp::SubscriptionOptionsWithAllocator() + ) +) +{ + auto ts_lib = rclcpp::get_typesupport_library( + topic_type, "rosidl_typesupport_cpp"); + + return create_generic_subscription( + topics_interface, topic_name, + *rclcpp::get_typesupport_handle(topic_type, "rosidl_typesupport_cpp", *ts_lib), + qos, std::forward(callback), options + ); +} + } // namespace rclcpp #endif // RCLCPP__CREATE_GENERIC_SUBSCRIPTION_HPP_ diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index f6088a33c3..f87ebb1b16 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -35,6 +35,53 @@ namespace rclcpp namespace detail { +/// Create and return a publisher of the given MessageT type. +/** + * The NodeT type only needs to have a method called get_node_topics_interface() + * which returns a shared_ptr to a NodeTopicsInterface. + */ +template< + typename MessageT = SerializedMessage, + typename AllocatorT = std::allocator, + typename PublisherT = rclcpp::Publisher, + typename NodeT> +std::shared_ptr +create_publisher( + NodeT & node, + const std::string & topic_name, + const rosidl_message_type_support_t & type_support, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options = ( + rclcpp::PublisherOptionsWithAllocator() + ) +) +{ + // Extract the NodeTopicsInterface from the NodeT. + using rclcpp::node_interfaces::get_node_topics_interface; + auto node_topics_interface = get_node_topics_interface(node); + auto node_parameters = node.get_node_parameters_interface(); + const rclcpp::QoS & actual_qos = options.qos_overriding_options.get_policy_kinds().size() ? + rclcpp::detail::declare_qos_parameters( + options.qos_overriding_options, node_parameters, + node_topics_interface->resolve_topic_name(topic_name), + qos, rclcpp::detail::PublisherQosParametersTraits{}) : + qos; + + // Create the publisher. + auto pub = node_topics_interface->create_publisher( + topic_name, + rclcpp::create_publisher_factory( + options, + type_support), + actual_qos + ); + + // Add the publisher to the node topics interface. + node_topics_interface->add_publisher(pub, options.callback_group); + + return std::dynamic_pointer_cast(pub); +} + /// Create and return a publisher of the given MessageT type. template< typename MessageT, diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 54ff0c0dee..86e5d92607 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -36,9 +36,14 @@ #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/publisher_base.hpp" +#include "rclcpp/qos.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_adapter.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/asserts.hpp" + namespace rclcpp { @@ -185,6 +190,56 @@ class IntraProcessManager uint64_t intra_process_publisher_id, std::unique_ptr message, typename allocator::AllocRebind::allocator_type & allocator) + { + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; + using Indices = SplitSubscriptionsIndices; + + std::shared_lock lock(mutex_); + + auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); + if (publisher_it == pub_to_subs_.end()) { + // Publisher is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling do_intra_process_publish for invalid or no longer existing publisher id"); + return; + } + const auto & sub_ids = publisher_it->second; + + // check if (de)serialization is needed + if (sub_ids.take_subscriptions[Indices::ownership_other].size() + + sub_ids.take_subscriptions[Indices::shared_other].size() > 0) + { + do_intra_process_publish_other_type( + intra_process_publisher_id, + message.get(), allocator, + sub_ids.take_subscriptions[Indices::ownership_other], + sub_ids.take_subscriptions[Indices::shared_other] + ); + } + + do_intra_process_publish_same_type( + intra_process_publisher_id, + std::move(message), allocator, + sub_ids.take_subscriptions[Indices::ownership_same], + sub_ids.take_subscriptions[Indices::shared_same] + ); + } + + template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete + > + void + do_intra_process_publish_same_type( + uint64_t intra_process_publisher_id, + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) { using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; @@ -199,41 +254,40 @@ class IntraProcessManager "Calling do_intra_process_publish for invalid or no longer existing publisher id"); return; } - const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { + if (take_ownership_subscriptions.empty()) { // None of the buffers require ownership, so we promote the pointer std::shared_ptr msg = std::move(message); this->template add_shared_msg_to_buffers( - msg, sub_ids.take_shared_subscriptions); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() <= 1) + msg, take_shared_subscriptions); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() <= 1) { // There is at maximum 1 buffer that does not require ownership. // So this case is equivalent to all the buffers requiring ownership // Merge the two vector of ids into a unique one - std::vector concatenated_vector(sub_ids.take_shared_subscriptions); + std::vector concatenated_vector(take_shared_subscriptions); concatenated_vector.insert( concatenated_vector.end(), - sub_ids.take_ownership_subscriptions.begin(), - sub_ids.take_ownership_subscriptions.end()); + take_ownership_subscriptions.begin(), + take_ownership_subscriptions.end()); this->template add_owned_msg_to_buffers( std::move(message), concatenated_vector, allocator); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() > 1) + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() > 1) { // Construct a new shared pointer from the message // for the buffers that do not require ownership auto shared_msg = std::allocate_shared(allocator, *message); this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); + shared_msg, take_shared_subscriptions); this->template add_owned_msg_to_buffers( - std::move(message), sub_ids.take_ownership_subscriptions, allocator); + std::move(message), take_ownership_subscriptions, allocator); } } @@ -248,6 +302,57 @@ class IntraProcessManager uint64_t intra_process_publisher_id, std::unique_ptr message, typename allocator::AllocRebind::allocator_type & allocator) + { + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; + using Indices = SplitSubscriptionsIndices; + + std::shared_lock lock(mutex_); + + auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); + if (publisher_it == pub_to_subs_.end()) { + // Publisher is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling do_intra_process_publish for invalid or no longer existing publisher id"); + return nullptr; + } + const auto & sub_ids = publisher_it->second; + + // check if (de)serialization is needed + if (sub_ids.take_subscriptions[Indices::ownership_other].size() + + sub_ids.take_subscriptions[Indices::shared_other].size() > 0) + { + do_intra_process_publish_other_type( + intra_process_publisher_id, + message.get(), allocator, + sub_ids.take_subscriptions[Indices::ownership_other], + sub_ids.take_subscriptions[Indices::shared_other] + ); + } + + return do_intra_process_publish_and_return_shared_same_type( + intra_process_publisher_id, + std::move(message), allocator, + sub_ids.take_subscriptions[Indices::ownership_same], + sub_ids.take_subscriptions[Indices::shared_same] + ); + } + + template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete + > + std::shared_ptr + do_intra_process_publish_and_return_shared_same_type( + uint64_t intra_process_publisher_id, + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) { using MessageAllocTraits = allocator::AllocRebind; using MessageAllocatorT = typename MessageAllocTraits::allocator_type; @@ -262,14 +367,13 @@ class IntraProcessManager "Calling do_intra_process_publish for invalid or no longer existing publisher id"); return nullptr; } - const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { + if (take_ownership_subscriptions.empty()) { // If there are no owning, just convert to shared. std::shared_ptr shared_msg = std::move(message); - if (!sub_ids.take_shared_subscriptions.empty()) { + if (!take_shared_subscriptions.empty()) { this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); + shared_msg, take_shared_subscriptions); } return shared_msg; } else { @@ -277,15 +381,15 @@ class IntraProcessManager // do not require ownership and to return. auto shared_msg = std::allocate_shared(allocator, *message); - if (!sub_ids.take_shared_subscriptions.empty()) { + if (!take_shared_subscriptions.empty()) { this->template add_shared_msg_to_buffers( shared_msg, - sub_ids.take_shared_subscriptions); + take_shared_subscriptions); } - if (!sub_ids.take_ownership_subscriptions.empty()) { + if (!take_ownership_subscriptions.empty()) { this->template add_owned_msg_to_buffers( std::move(message), - sub_ids.take_ownership_subscriptions, + take_ownership_subscriptions, allocator); } return shared_msg; @@ -314,8 +418,39 @@ class IntraProcessManager private: struct SplittedSubscriptions { - std::vector take_shared_subscriptions; - std::vector take_ownership_subscriptions; + enum + { + IndexSharedTyped = 0u, IndexSharedSerialized = 1u, + IndexOwnershipTyped = 2u, IndexOwnershipSerialized = 3u, + IndexNum = 4u + }; + + /// get the index for "take_subscriptions" depending on shared/serialized + constexpr static uint32_t + get_index(const bool is_shared, const bool is_serialized) + { + return (is_serialized ? IndexSharedTyped : IndexSharedSerialized) + + (is_shared ? IndexSharedTyped : IndexOwnershipTyped); + } + + std::vector take_subscriptions[IndexNum]; + }; + + template + struct SplitSubscriptionsIndices + { + constexpr static auto ownership_same = SplittedSubscriptions::get_index( + false, + is_serialized); + constexpr static auto shared_same = SplittedSubscriptions::get_index( + true, + is_serialized); + constexpr static auto ownership_other = SplittedSubscriptions::get_index( + false, + !is_serialized); + constexpr static auto shared_other = SplittedSubscriptions::get_index( + true, + !is_serialized); }; using SubscriptionMap = @@ -334,7 +469,9 @@ class IntraProcessManager RCLCPP_PUBLIC void - insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method); + insert_sub_id_for_pub( + uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method, + bool is_serialized_subscriber); RCLCPP_PUBLIC bool @@ -342,6 +479,158 @@ class IntraProcessManager rclcpp::PublisherBase::SharedPtr pub, rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const; + template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete> + typename std::enable_if_t< + std::is_same::value, + void + > + do_intra_process_publish_other_type( + uint64_t intra_process_publisher_id, + const MessageT * unsupported_message, + typename allocator::AllocRebind::allocator_type &, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + typedef std::allocator SerializedAlloc; + using SerializedAllocatorTraits = allocator::AllocRebind; + + SerializedAllocatorTraits::allocator_type serialized_allocator; + + auto ptr = SerializedAllocatorTraits::allocate(serialized_allocator, 1); + SerializedAllocatorTraits::construct(serialized_allocator, ptr); + std::unique_ptr serialized_message(ptr); + if constexpr (!std::is_same::value) { + Serialization serializer; + serializer.serialize_message(unsupported_message, serialized_message.get()); + } else { + (void)unsupported_message; + throw std::runtime_error("Serialized message cannot be serialized."); + } + + this->template do_intra_process_publish_and_return_shared_same_type( + intra_process_publisher_id, + std::move(serialized_message), + serialized_allocator, + take_ownership_subscriptions, + take_shared_subscriptions + ); + } + + template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete> + typename std::enable_if_t< + !std::is_same::value, + void + > + do_intra_process_publish_other_type( + uint64_t intra_process_publisher_id, + const MessageT * unsupported_message, + typename allocator::AllocRebind::allocator_type &, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + typedef std::allocator SerializedAlloc; + using SerializedAllocatorTraits = allocator::AllocRebind; + + SerializedAllocatorTraits::allocator_type serialized_allocator; + + auto ptr = SerializedAllocatorTraits::allocate(serialized_allocator, 1); + SerializedAllocatorTraits::construct(serialized_allocator, ptr); + std::unique_ptr serialized_message(ptr); + Serialization serializer; + ROSMessageType ros_msg; + rclcpp::TypeAdapter::convert_to_ros_message( + *unsupported_message, + ros_msg); + serializer.serialize_message(&ros_msg, serialized_message.get()); + + this->template do_intra_process_publish_and_return_shared_same_type( + intra_process_publisher_id, + std::move(serialized_message), + serialized_allocator, + take_ownership_subscriptions, + take_shared_subscriptions + ); + } + + template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete> + typename std::enable_if_t< + !std::is_same::value, + void + > + do_intra_process_publish_other_type( + uint64_t intra_process_publisher_id, + const rclcpp::SerializedMessage * unsupported_message, + typename allocator::AllocRebind::allocator_type & allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + for (const auto id : take_ownership_subscriptions) { + auto subscription_it = subscriptions_.find(id); + if (subscription_it == subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.lock(); + if (subscription_base == nullptr) { + subscriptions_.erase(id); + continue; + } + + if (subscription_base->serve_serialized_message( + unsupported_message, + this, + intra_process_publisher_id, + static_cast(&allocator), + take_ownership_subscriptions, + take_shared_subscriptions + )) + { + // message was successfully converted and forwarded, so stop further forwarding + return; + } + } + + for (const auto id : take_shared_subscriptions) { + auto subscription_it = subscriptions_.find(id); + if (subscription_it == subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.lock(); + if (subscription_base == nullptr) { + subscriptions_.erase(id); + continue; + } + + if (subscription_base->serve_serialized_message( + unsupported_message, + this, + intra_process_publisher_id, + static_cast(&allocator), + take_ownership_subscriptions, + take_shared_subscriptions + )) + { + // message was successfully converted and forwarded, so stop further forwarding + return; + } + } + } + template< typename MessageT, typename Alloc, @@ -524,6 +813,40 @@ class IntraProcessManager mutable std::shared_timed_mutex mutex_; }; +namespace detail +{ +/** + * Helper function to expose method of IntraProcessManager + * for publishing messages with same data type. + * + * This is needed as the publisher of serialized message is not aware of the subscribed + * data type. While the subscription has all needed information (MessageT, Allocator) to + * cast and deserialize the message. The type information is forwarded by the helper function. + */ +template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete +> +void do_intra_process_publish_same_type( + IntraProcessManager * intraprocess_manager, + uint64_t intra_process_publisher_id, + std::unique_ptr message, + typename rclcpp::allocator::AllocRebind::allocator_type & allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) +{ + intraprocess_manager->template do_intra_process_publish_same_type( + intra_process_publisher_id, + std::move(message), + allocator, + take_ownership_subscriptions, + take_shared_subscriptions); +} +} // namespace detail + } // namespace experimental } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index c590207b90..9b97a48461 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -19,13 +19,16 @@ #include #include #include +#include #include "rcl/wait.h" #include "rmw/impl/cpp/demangle.hpp" +#include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/guard_condition.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/qos.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/waitable.hpp" namespace rclcpp @@ -33,6 +36,25 @@ namespace rclcpp namespace experimental { +// Forward declarations to reuse methods from IntraProcessManager +class IntraProcessManager; +namespace detail +{ +template< + typename MessageT, + typename ROSMessageType, + typename Alloc, + typename Deleter = std::default_delete +> +void do_intra_process_publish_same_type( + IntraProcessManager * intraprocess_manager, + uint64_t intra_process_publisher_id, + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions); +} // namespace detail + class SubscriptionIntraProcessBase : public rclcpp::Waitable { public: @@ -176,6 +198,36 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable on_new_message_callback_ = nullptr; } + /// Check if subscription type is a serialized message type. + RCLCPP_PUBLIC + virtual bool + is_serialized() const = 0; + + /// Convert serialized message to ROS message type and serve it back to IPM. + /** + * Convert serialized message to ROS message type and serve it back to IPM. + * This is needed as the publisher of serialized message is not aware of the subscribed + * data type. While the subscription has all needed information (MessageT, Allocator) to + * cast and deserialize the message. + * + * \param serialized_message serialized message which needs to be de-serialized. + * \param intraprocess_manager intraprocess manger to which the de-serialized messaged should be forwarded. + * \param intra_process_publisher_id id of publisher. + * \param untyped_allocator pointer to allocator of message. + * \param take_ownership_subscriptions subscription ids which takes ownership. + * \param take_shared_subscriptions subscription ids with shared ownership. + * \return true for success. + */ + RCLCPP_PUBLIC + virtual bool + serve_serialized_message( + const rclcpp::SerializedMessage * serialized_message, + IntraProcessManager * intraprocess_manager, + uint64_t intra_process_publisher_id, + void * untyped_allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) = 0; + protected: std::recursive_mutex callback_mutex_; std::function on_new_message_callback_ {nullptr}; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index f564cbb047..4e30714b38 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rcl/error_handling.h" #include "rcl/guard_condition.h" @@ -169,6 +170,72 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff return buffer_->use_take_shared_method(); } + bool + is_serialized() const override + { + return serialization_traits::is_serialized_message_class::value; + } + + SubscribedTypeUniquePtr deserialize_message( + const rclcpp::SerializedMessage * serialized_message) + { + if (serialized_message == nullptr) { + // The method is only allowed to be called with valid data (serialized case). + throw std::runtime_error("nullptr provided for serialized message"); + } + + auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1); + SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr); + if constexpr (std::is_same::value) { + (void)serialized_message; + throw std::runtime_error("Serialized message cannot be serialized."); + } else if constexpr (std::is_same::value) { + Serialization serializer; + serializer.deserialize_message(serialized_message, ptr); + } else { + ROSMessageType ros_msg; + Serialization serializer; + serializer.deserialize_message(serialized_message, &ros_msg); + rclcpp::TypeAdapter::convert_to_custom(ros_msg, *ptr); + } + + return SubscribedTypeUniquePtr(ptr, subscribed_type_deleter_); + } + + bool serve_serialized_message( + const rclcpp::SerializedMessage * serialized_message, + IntraProcessManager * intraprocess_manager, + uint64_t intra_process_publisher_id, + void * untyped_allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) override + { + if (intraprocess_manager == nullptr) { + // The method is only allowed to be called with valid data (serialized case). + throw std::runtime_error("nullptr provided for serialized inter-process manager"); + } + + typedef typename allocator::AllocRebind::allocator_type AllocType; + auto allocator = static_cast(untyped_allocator); + + if (allocator == nullptr) { + throw std::runtime_error( + "failed to cast allocator " + "which can happen when the publisher and " + "subscription use different allocator types, which is not supported"); + } + + detail::do_intra_process_publish_same_type( + intraprocess_manager, + intra_process_publisher_id, + deserialize_message(serialized_message), + *allocator, + take_ownership_subscriptions, + take_shared_subscriptions); + + return true; + } + size_t available_capacity() const override { return buffer_->available_capacity(); diff --git a/rclcpp/include/rclcpp/generic_subscription.hpp b/rclcpp/include/rclcpp/generic_subscription.hpp index e9bf79deea..9f0a41d994 100644 --- a/rclcpp/include/rclcpp/generic_subscription.hpp +++ b/rclcpp/include/rclcpp/generic_subscription.hpp @@ -16,150 +16,12 @@ #ifndef RCLCPP__GENERIC_SUBSCRIPTION_HPP_ #define RCLCPP__GENERIC_SUBSCRIPTION_HPP_ -#include -#include -#include - -#include "rcpputils/shared_library.hpp" - -#include "rclcpp/callback_group.hpp" -#include "rclcpp/macros.hpp" -#include "rclcpp/node_interfaces/node_base_interface.hpp" -#include "rclcpp/node_interfaces/node_topics_interface.hpp" -#include "rclcpp/qos.hpp" -#include "rclcpp/serialized_message.hpp" -#include "rclcpp/subscription_base.hpp" -#include "rclcpp/typesupport_helpers.hpp" -#include "rclcpp/visibility_control.hpp" +#include "rclcpp/subscription.hpp" namespace rclcpp { -/// %Subscription for serialized messages whose type is not known at compile time. -/** - * Since the type is not known at compile time, this is not a template, and the dynamic library - * containing type support information has to be identified and loaded based on the type name. - * - * It does not support intra-process handling. - */ -class GenericSubscription : public rclcpp::SubscriptionBase -{ -public: - // cppcheck-suppress unknownMacro - RCLCPP_SMART_PTR_DEFINITIONS(GenericSubscription) - - /// Constructor. - /** - * In order to properly subscribe to a topic, this subscription needs to be added to - * the node_topic_interface of the node passed into this constructor. - * - * \sa rclcpp::Node::create_generic_subscription() or rclcpp::create_generic_subscription() for - * creating an instance of this class and adding it to the node_topic_interface. - * - * \param node_base Pointer to parent node's NodeBaseInterface - * \param ts_lib Type support library, needs to correspond to topic_type - * \param topic_name Topic name - * \param topic_type Topic type - * \param qos %QoS settings - * \param callback Callback for new messages of serialized form - * \param options %Subscription options. - * Not all subscription options are currently respected, the only relevant options for this - * subscription are `event_callbacks`, `use_default_callbacks`, `ignore_local_publications`, and - * `%callback_group`. - */ - template> - GenericSubscription( - rclcpp::node_interfaces::NodeBaseInterface * node_base, - const std::shared_ptr ts_lib, - const std::string & topic_name, - const std::string & topic_type, - const rclcpp::QoS & qos, - AnySubscriptionCallback callback, - const rclcpp::SubscriptionOptionsWithAllocator & options) - : SubscriptionBase( - node_base, - *rclcpp::get_message_typesupport_handle(topic_type, "rosidl_typesupport_cpp", *ts_lib), - topic_name, - options.to_rcl_subscription_options(qos), - options.event_callbacks, - options.use_default_callbacks, - DeliveredMessageKind::SERIALIZED_MESSAGE), - callback_([callback]( - std::shared_ptr serialized_message, - const rclcpp::MessageInfo & message_info) mutable { - callback.dispatch(serialized_message, message_info); - }), - ts_lib_(ts_lib) - {} - - RCLCPP_PUBLIC - virtual ~GenericSubscription() = default; - - // Same as create_serialized_message() as the subscription is to serialized_messages only - RCLCPP_PUBLIC - std::shared_ptr create_message() override; - - RCLCPP_PUBLIC - std::shared_ptr create_serialized_message() override; - - /// Cast the message to a rclcpp::SerializedMessage and call the callback. - RCLCPP_PUBLIC - void handle_message( - std::shared_ptr & message, const rclcpp::MessageInfo & message_info) override; - - /// Handle dispatching rclcpp::SerializedMessage to user callback. - RCLCPP_PUBLIC - void - handle_serialized_message( - const std::shared_ptr & serialized_message, - const rclcpp::MessageInfo & message_info) override; - - /// This function is currently not implemented. - RCLCPP_PUBLIC - void handle_loaned_message( - void * loaned_message, const rclcpp::MessageInfo & message_info) override; - - // Same as return_serialized_message() as the subscription is to serialized_messages only - RCLCPP_PUBLIC - void return_message(std::shared_ptr & message) override; - - RCLCPP_PUBLIC - void return_serialized_message(std::shared_ptr & message) override; - - - // DYNAMIC TYPE ================================================================================== - RCLCPP_PUBLIC - rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr get_shared_dynamic_message_type() - override; - - RCLCPP_PUBLIC - rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr get_shared_dynamic_message() override; - - RCLCPP_PUBLIC - rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr - get_shared_dynamic_serialization_support() override; - - RCLCPP_PUBLIC - rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr create_dynamic_message() override; - - RCLCPP_PUBLIC - void return_dynamic_message( - rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message) override; - - RCLCPP_PUBLIC - void handle_dynamic_message( - const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message, - const rclcpp::MessageInfo & message_info) override; - -private: - RCLCPP_DISABLE_COPY(GenericSubscription) - - std::function, - const rclcpp::MessageInfo)> callback_; - // The type support library should stay loaded, so it is stored in the GenericSubscription - std::shared_ptr ts_lib_; -}; +using GenericSubscription = rclcpp::Subscription; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 50b96bbeaf..97bd8747cc 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -371,6 +371,34 @@ class Node : public std::enable_shared_from_this ) ); + /// Create and return a GenericSubscription. + /** + * The returned pointer will never be empty, but this function can throw various exceptions, for + * instance when the message's package can not be found on the AMENT_PREFIX_PATH. + * + * \param[in] topic_name Topic name + * \param[in] type_support_handle Topic type support + * \param[in] qos %QoS settings + * \param[in] callback Callback for new messages of serialized form + * \param[in] options %Subscription options. + * Not all subscription options are currently respected, the only relevant options for this + * subscription are `event_callbacks`, `use_default_callbacks`, `ignore_local_publications`, and + * `%callback_group`. + * \return Shared pointer to the created generic subscription. + */ + template< + typename CallbackT, + typename AllocatorT = std::allocator> + std::shared_ptr create_generic_subscription( + const std::string & topic_name, + const rosidl_message_type_support_t & type_support_handle, + const rclcpp::QoS & qos, + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options = ( + rclcpp::SubscriptionOptionsWithAllocator() + ) + ); + /// Declare and initialize a parameter, return the effective value. /** * This method is used to declare that a parameter exists on this node. diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index d55a23f9c1..80186d0f14 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -240,6 +240,25 @@ Node::create_generic_subscription( ); } +template +std::shared_ptr +Node::create_generic_subscription( + const std::string & topic_name, + const rosidl_message_type_support_t & type_support_handle, + const rclcpp::QoS & qos, + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options) +{ + return rclcpp::create_generic_subscription( + node_topics_, + extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), + type_support_handle, + qos, + std::forward(callback), + options + ); +} + template auto diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index c03220f38b..92dc1b9e26 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -78,7 +78,8 @@ class Publisher : public PublisherBase { public: static_assert( - rclcpp::is_ros_compatible_type::value, + rclcpp::is_ros_compatible_type::value || std::is_same::value, "given message type is not compatible with ROS and cannot be used with a Publisher"); /// MessageT::custom_type if MessageT is a TypeAdapter, otherwise just MessageT. @@ -121,16 +122,18 @@ class Publisher : public PublisherBase * \param[in] topic Name of the topic to publish to. * \param[in] qos QoS profile for Subcription. * \param[in] options Options for the subscription. + * \param[in] type_support Type support for the subscribed topic. */ Publisher( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rclcpp::QoS & qos, - const rclcpp::PublisherOptionsWithAllocator & options) + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) : PublisherBase( node_base, topic, - rclcpp::get_message_type_support_handle(), + type_support, options.template to_rcl_publisher_options(qos), // NOTE(methylDragon): Passing these args separately is necessary for event binding options.event_callbacks, @@ -144,6 +147,19 @@ class Publisher : public PublisherBase // Setup continues in the post construction method, post_init_setup(). } + Publisher( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options) + : Publisher( + node_base, + topic, + qos, + options, + rclcpp::get_message_type_support_handle()) + {} + /// Called post construction, so that construction may continue after shared_from_this() works. virtual void @@ -248,6 +264,40 @@ class Publisher : public PublisherBase } } + /// Publish a serialized message on the topic. + /** + * This signature is enabled if the element_type of the std::unique_ptr is + * a rclcpp::SerializedMessage, as opposed to the custom_type of a TypeAdapter, + * and that type matches the type given when creating the publisher. + * + * This signature allows the user to give ownership of the message to rclcpp, + * allowing for more efficient intra-process communication optimizations. + * + * \param[in] msg A unique pointer to the message to send. + */ + template + typename std::enable_if_t< + std::is_same::value && + std::is_same::value + > + publish(std::unique_ptr msg) + { + bool inter_process_publish_needed = + get_subscription_count() > get_intra_process_subscription_count(); + + if (inter_process_publish_needed) { + auto status = rcl_publish_serialized_message( + publisher_handle_.get(), &msg->get_rcl_serialized_message(), nullptr); + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); + } + } + + if (intra_process_is_enabled_) { + this->do_intra_process_ros_message_publish(std::move(msg)); + } + } + /// Publish a message on the topic. /** * This signature is enabled if the object being published is diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 87def3cc17..f9ca967e4d 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -60,6 +60,35 @@ struct PublisherFactory const PublisherFactoryFunction create_typed_publisher; }; +/// Return a PublisherFactory with functions setup for creating a PublisherT. +template +PublisherFactory +create_publisher_factory( + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) +{ + PublisherFactory factory { + // factory function that creates a specific PublisherT + [options, type_support]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rclcpp::QoS & qos + ) -> std::shared_ptr + { + auto publisher = std::make_shared( + node_base, topic_name, qos, options, type_support); + // This is used for setting up things like intra process comms which + // require this->shared_from_this() which cannot be called from + // the constructor. + publisher->post_init_setup(node_base, topic_name, qos, options); + return publisher; + } + }; + + // return the factory now that it is populated + return factory; +} + /// Return a PublisherFactory with functions setup for creating a PublisherT. template PublisherFactory diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index d6e0f536ed..2b8964bdb0 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -153,6 +153,14 @@ class Subscription : public SubscriptionBase if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { using rclcpp::detail::resolve_intra_process_buffer_type; + if (callback.is_serialized_message_callback() && + !serialization_traits::is_serialized_message_class::value) + { + throw std::invalid_argument( + "intraprocess communication for serialized callback " + "allowed only with rclcpp::SerializedMessage subscription type"); + } + // Check if the QoS is compatible with intra-process. auto qos_profile = get_actual_qos(); if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { @@ -325,6 +333,11 @@ class Subscription : public SubscriptionBase const std::shared_ptr & serialized_message, const rclcpp::MessageInfo & message_info) override { + if (matches_any_intra_process_publishers(&message_info.get_rmw_message_info().publisher_gid)) { + // In this case, the message will be delivered via intra process and + // we should ignore this copy of the message. + return; + } std::chrono::time_point now; if (subscription_topic_statistics_) { // get current time before executing callback to diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index 0e9d9fefe5..47f80a173c 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -124,6 +124,65 @@ create_subscription_factory( // return the factory now that it is populated return factory; } +/// Return a SubscriptionFactory setup to create a SubscriptionT. +template< + typename MessageT = void, + typename CallbackT, + typename AllocatorT, + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >> +SubscriptionFactory +create_subscription_factory( + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options, + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat, + const rosidl_message_type_support_t & type_support, + std::shared_ptr + subscription_topic_stats = nullptr) +{ + auto allocator = options.get_allocator(); + + using rclcpp::AnySubscriptionCallback; + AnySubscriptionCallback any_subscription_callback(*allocator); + any_subscription_callback.set(std::forward(callback)); + + SubscriptionFactory factory { + // factory function that creates a MessageT specific SubscriptionT + [options, msg_mem_strat, any_subscription_callback, type_support, subscription_topic_stats]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rclcpp::QoS & qos + ) -> rclcpp::SubscriptionBase::SharedPtr + { + using rclcpp::Subscription; + using rclcpp::SubscriptionBase; + + auto sub = Subscription::make_shared( + node_base, + type_support, + topic_name, + qos, + any_subscription_callback, + options, + msg_mem_strat, + subscription_topic_stats); + // This is used for setting up things like intra process comms which + // require this->shared_from_this() which cannot be called from + // the constructor. + sub->post_init_setup(node_base, qos, options); + auto sub_base_ptr = std::dynamic_pointer_cast(sub); + return sub_base_ptr; + } + }; + + // return the factory now that it is populated + return factory; +} } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/generic_subscription.cpp b/rclcpp/src/rclcpp/generic_subscription.cpp deleted file mode 100644 index 2d2be61277..0000000000 --- a/rclcpp/src/rclcpp/generic_subscription.cpp +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2018, Bosch Software Innovations GmbH. -// Copyright 2021, Apex.AI Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rclcpp/generic_subscription.hpp" - -#include -#include - -#include "rcl/subscription.h" - -#include "rclcpp/exceptions.hpp" - -namespace rclcpp -{ - -std::shared_ptr -GenericSubscription::create_message() -{ - return create_serialized_message(); -} - -std::shared_ptr -GenericSubscription::create_serialized_message() -{ - return std::make_shared(0); -} - -void -GenericSubscription::handle_message( - std::shared_ptr &, - const rclcpp::MessageInfo &) -{ - throw rclcpp::exceptions::UnimplementedError( - "handle_message is not implemented for GenericSubscription"); -} - -void -GenericSubscription::handle_serialized_message( - const std::shared_ptr & message, - const rclcpp::MessageInfo & message_info) -{ - callback_(message, message_info); -} - -void -GenericSubscription::handle_loaned_message( - void * message, const rclcpp::MessageInfo & message_info) -{ - (void) message; - (void) message_info; - throw rclcpp::exceptions::UnimplementedError( - "handle_loaned_message is not implemented for GenericSubscription"); -} - -void -GenericSubscription::return_message(std::shared_ptr & message) -{ - auto typed_message = std::static_pointer_cast(message); - return_serialized_message(typed_message); -} - -void -GenericSubscription::return_serialized_message( - std::shared_ptr & message) -{ - message.reset(); -} - - -// DYNAMIC TYPE ==================================================================================== -// TODO(methylDragon): Reorder later -rclcpp::dynamic_typesupport::DynamicMessageType::SharedPtr -GenericSubscription::get_shared_dynamic_message_type() -{ - throw rclcpp::exceptions::UnimplementedError( - "get_shared_dynamic_message_type is not implemented for GenericSubscription"); -} - -rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr -GenericSubscription::get_shared_dynamic_message() -{ - throw rclcpp::exceptions::UnimplementedError( - "get_shared_dynamic_message is not implemented for GenericSubscription"); -} - -rclcpp::dynamic_typesupport::DynamicSerializationSupport::SharedPtr -GenericSubscription::get_shared_dynamic_serialization_support() -{ - throw rclcpp::exceptions::UnimplementedError( - "get_shared_dynamic_serialization_support is not implemented for GenericSubscription"); -} - -rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr -GenericSubscription::create_dynamic_message() -{ - throw rclcpp::exceptions::UnimplementedError( - "create_dynamic_message is not implemented for GenericSubscription"); -} - -void -GenericSubscription::return_dynamic_message( - rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message) -{ - (void) message; - throw rclcpp::exceptions::UnimplementedError( - "return_dynamic_message is not implemented for GenericSubscription"); -} - -void -GenericSubscription::handle_dynamic_message( - const rclcpp::dynamic_typesupport::DynamicMessage::SharedPtr & message, - const rclcpp::MessageInfo & message_info) -{ - (void) message; - (void) message_info; - throw rclcpp::exceptions::UnimplementedError( - "handle_dynamic_message is not implemented for GenericSubscription"); -} - -} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index ea068f03ae..00f9bae4be 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -51,7 +51,9 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) } if (can_communicate(publisher, subscription)) { uint64_t sub_id = pair.first; - insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); + insert_sub_id_for_pub( + sub_id, pub_id, + subscription->use_take_shared_method(), subscription->is_serialized()); } } @@ -75,7 +77,9 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su } if (can_communicate(publisher, subscription)) { uint64_t pub_id = pair.first; - insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); + insert_sub_id_for_pub( + sub_id, pub_id, + subscription->use_take_shared_method(), subscription->is_serialized()); } } @@ -90,19 +94,14 @@ IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) subscriptions_.erase(intra_process_subscription_id); for (auto & pair : pub_to_subs_) { - pair.second.take_shared_subscriptions.erase( - std::remove( - pair.second.take_shared_subscriptions.begin(), - pair.second.take_shared_subscriptions.end(), - intra_process_subscription_id), - pair.second.take_shared_subscriptions.end()); - - pair.second.take_ownership_subscriptions.erase( - std::remove( - pair.second.take_ownership_subscriptions.begin(), - pair.second.take_ownership_subscriptions.end(), - intra_process_subscription_id), - pair.second.take_ownership_subscriptions.end()); + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + pair.second.take_subscriptions[i].erase( + std::remove( + pair.second.take_subscriptions[i].begin(), + pair.second.take_subscriptions[i].end(), + intra_process_subscription_id), + pair.second.take_subscriptions[i].end()); + } } } @@ -146,9 +145,10 @@ IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) return 0; } - auto count = - publisher_it->second.take_shared_subscriptions.size() + - publisher_it->second.take_ownership_subscriptions.size(); + size_t count = 0u; + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + count += publisher_it->second.take_subscriptions[i].size(); + } return count; } @@ -198,13 +198,12 @@ void IntraProcessManager::insert_sub_id_for_pub( uint64_t sub_id, uint64_t pub_id, - bool use_take_shared_method) + bool use_take_shared_method, + bool is_serialized_subscriber) { - if (use_take_shared_method) { - pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id); - } else { - pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id); - } + pub_to_subs_[pub_id].take_subscriptions[SplittedSubscriptions::get_index( + use_take_shared_method, + is_serialized_subscriber)].push_back(sub_id); } bool @@ -239,9 +238,12 @@ IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publ return 0u; } - if (publisher_it->second.take_shared_subscriptions.empty() && - publisher_it->second.take_ownership_subscriptions.empty()) - { + size_t count = 0u; + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + count += publisher_it->second.take_subscriptions[i].size(); + } + + if (count == 0u) { // no subscriptions available return 0u; } @@ -262,12 +264,10 @@ IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publ } }; - for (const auto sub_id : publisher_it->second.take_shared_subscriptions) { - available_capacity(sub_id); - } - - for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) { - available_capacity(sub_id); + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + for (const auto sub_id : publisher_it->second.take_subscriptions[i]) { + available_capacity(sub_id); + } } return capacity; diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index b13123b65b..bd5b9c2c15 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -20,7 +20,7 @@ using rclcpp::experimental::SubscriptionIntraProcessBase; void SubscriptionIntraProcessBase::add_to_wait_set(rcl_wait_set_t * wait_set) { - detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); + rclcpp::detail::add_guard_condition_to_rcl_wait_set(*wait_set, gc_); } const char * diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index ca6c8f2adf..79cf8b4f0f 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -108,6 +108,17 @@ ament_add_gtest(test_intra_process_buffer test_intra_process_buffer.cpp) if(TARGET test_intra_process_buffer) target_link_libraries(test_intra_process_buffer ${PROJECT_NAME}) endif() +ament_add_gtest(test_intra_process_subscriber test_intra_process_subscriber.cpp) +if(TARGET test_intra_process_subscriber) + ament_target_dependencies(test_intra_process_subscriber + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + target_link_libraries(test_intra_process_subscriber ${PROJECT_NAME}) +endif() ament_add_gtest(test_loaned_message test_loaned_message.cpp) target_link_libraries(test_loaned_message ${PROJECT_NAME} mimick ${test_msgs_TARGETS}) diff --git a/rclcpp/test/rclcpp/test_generic_pubsub.cpp b/rclcpp/test/rclcpp/test_generic_pubsub.cpp index 79fbfcc33a..3be3b414c3 100644 --- a/rclcpp/test/rclcpp/test_generic_pubsub.cpp +++ b/rclcpp/test/rclcpp/test_generic_pubsub.cpp @@ -33,6 +33,7 @@ using namespace ::testing; // NOLINT using namespace rclcpp; // NOLINT +using namespace std::chrono_literals; class RclcppGenericNodeFixture : public Test { @@ -61,9 +62,12 @@ class RclcppGenericNodeFixture : public Test publishers_.push_back(publisher); } - template + template> std::vector subscribe_raw_messages( - size_t expected_recv_msg_count, const std::string & topic_name, const std::string & type) + size_t expected_recv_msg_count, const std::string & topic_name, const std::string & type, + const rclcpp::SubscriptionOptionsWithAllocator & options = ( + rclcpp::SubscriptionOptionsWithAllocator() + )) { std::vector messages; size_t counter = 0; @@ -75,7 +79,7 @@ class RclcppGenericNodeFixture : public Test serializer.deserialize_message(message.get(), &deserialized_message); messages.push_back(this->get_data_from_msg(deserialized_message)); counter++; - }); + }, options); while (counter < expected_recv_msg_count) { rclcpp::spin_some(node_); @@ -84,14 +88,14 @@ class RclcppGenericNodeFixture : public Test } template - rclcpp::SerializedMessage serialize_message(const T1 & data) + std::unique_ptr serialize_message(const T1 & data) { T2 message; write_message(data, message); rclcpp::Serialization ser; - SerializedMessage result; - ser.serialize_message(&message, &result); + auto result = std::make_unique(); + ser.serialize_message(&message, result.get()); return result; } @@ -171,7 +175,7 @@ TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_work) ASSERT_TRUE(success); for (const auto & message : test_messages) { - publisher->publish(serialize_message(message)); + publisher->publish(*serialize_message(message)); } auto subscribed_messages = subscriber_future_.get(); @@ -209,7 +213,7 @@ TEST_F(RclcppGenericNodeFixture, publish_loaned_msg_work) for (const auto & message : test_messages) { publisher->publish_as_loaned_msg( - serialize_message(message)); + *serialize_message(message)); } auto subscribed_messages = subscriber_future_.get(); @@ -219,7 +223,7 @@ TEST_F(RclcppGenericNodeFixture, publish_loaned_msg_work) ASSERT_THROW( { publisher->publish_as_loaned_msg( - serialize_message(test_messages[0])); + *serialize_message(test_messages[0])); }, rclcpp::exceptions::RCLError); } } @@ -266,6 +270,8 @@ TEST_F(RclcppGenericNodeFixture, generic_publisher_uses_qos) TEST_F(RclcppGenericNodeFixture, generic_subscription_different_callbacks) { + // If the GenericSubscription does not use the provided QoS profile, + // its request will be incompatible with the Publisher's offer and no messages will be passed. using namespace std::chrono_literals; std::string topic_name = "string_topic"; std::string topic_type = "test_msgs/msg/Strings"; @@ -309,3 +315,63 @@ TEST_F(RclcppGenericNodeFixture, generic_subscription_different_callbacks) ASSERT_TRUE(wait_for(connected, 5s)); } } + +TEST_F(RclcppGenericNodeFixture, publisher_and_subscriber_with_intraprocess) +{ + // We currently publish more messages because they can get lost + std::vector test_messages = {"Hello World"}; + std::string topic_name = "/string_topic"; + std::string type = "test_msgs/msg/Strings"; + + // add a dummy subscriber without ipm + auto node = std::make_shared("test_string_msg_listener_node"); + auto string_msgs_sub = node->create_subscription( + topic_name, 10, [](test_msgs::msg::Strings::ConstSharedPtr) {}); + + auto publisher_options = rclcpp::PublisherOptionsWithAllocator>(); + publisher_options.use_intra_process_comm = IntraProcessSetting::Enable; + auto ts_lib = rclcpp::get_typesupport_library( + type, "rosidl_typesupport_cpp"); + auto type_support = *rclcpp::get_typesupport_handle(type, "rosidl_typesupport_cpp", *ts_lib); + + auto publisher = + rclcpp::detail::create_publisher( + *node, topic_name, + type_support, + rclcpp::QoS(1), + publisher_options + ); + + auto subscriber_future_ = std::async( + std::launch::async, [this, topic_name, type] { + auto subscriber_options = rclcpp::SubscriptionOptionsWithAllocator>(); + subscriber_options.use_intra_process_comm = IntraProcessSetting::Enable; + return subscribe_raw_messages( + 1, topic_name, type, + subscriber_options); + }); + + // TODO(karsten1987): Port 'wait_for_sub' to rclcpp + auto allocator = node_->get_node_options().allocator(); + auto success = false; + auto ret = rcl_wait_for_subscribers( + node_->get_node_base_interface()->get_rcl_node_handle(), + &allocator, + topic_name.c_str(), + 1u, + static_cast(1e9), + &success); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_TRUE(success); + + std::this_thread::sleep_for(100ms); + + for (const auto & message : test_messages) { + publisher->publish(std::move(serialize_message(message))); + std::this_thread::sleep_for(100ms); + } + + auto subscribed_messages = subscriber_future_.get(); + EXPECT_THAT(subscribed_messages, SizeIs(1)); + EXPECT_THAT(subscribed_messages[0], StrEq("Hello World")); +} diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index da863f3e3c..06b5a4da67 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -24,6 +24,7 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/context.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/qos.hpp" #include "rmw/types.h" #include "rmw/qos_profiles.h" @@ -231,6 +232,24 @@ class SubscriptionIntraProcessBase return topic_name.c_str(); } + bool + is_serialized() const + { + return false; + } + + bool + serve_serialized_message( + const rclcpp::SerializedMessage * /*serialized_message*/, + IntraProcessManager * /*intraprocess_manager*/, + uint64_t /*intra_process_publisher_id*/, + void * /*untyped_allocator*/, + const std::vector & /*take_ownership_subscriptions*/, + const std::vector & /*take_shared_subscriptions*/) + { + return true; + } + virtual size_t available_capacity() const = 0; diff --git a/rclcpp/test/rclcpp/test_intra_process_subscriber.cpp b/rclcpp/test/rclcpp/test_intra_process_subscriber.cpp new file mode 100755 index 0000000000..5beb6f270a --- /dev/null +++ b/rclcpp/test/rclcpp/test_intra_process_subscriber.cpp @@ -0,0 +1,236 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#include +#include + +#include "gtest/gtest.h" + +#include "rclcpp/rclcpp.hpp" +#include "test_msgs/msg/empty.hpp" + +class TestIntraProcessSubscriber : public ::testing::Test +{ +public: + void SetUp() + { + rclcpp::init(0, nullptr); + const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions(); + node = std::make_shared("my_node", "/ns", node_options); + } + + void TearDown() + { + node.reset(); + rclcpp::shutdown(); + } + + rclcpp::Node::SharedPtr node; +}; + +/* + Construtctor + */ +TEST_F(TestIntraProcessSubscriber, constructor) { + using MessageT = test_msgs::msg::Empty; + using Alloc = std::allocator; + + rclcpp::AnySubscriptionCallback callback; + + auto allocator = std::make_shared(); + auto subscriber = std::make_shared>( + callback, + allocator, + rclcpp::contexts::get_global_default_context(), + "topic", + rclcpp::SystemDefaultsQoS().keep_last(10), + rclcpp::IntraProcessBufferType::SharedPtr); + + EXPECT_NE(nullptr, subscriber); +} + +/* + Test methods of SubscriptionIntraProcess with shared callback: + * is_serialized + * deserialize_message + * serve_serialized_message + */ +TEST_F(TestIntraProcessSubscriber, methods_shared) { + using MessageT = test_msgs::msg::Empty; + using Alloc = std::allocator; + + // create IPM subscriber + uint32_t counter = 0u; + rclcpp::AnySubscriptionCallback callback; + callback.set( + [&counter](const std::shared_ptr &) { + ++counter; + }); + + auto allocator = std::make_shared(); + auto context = rclcpp::contexts::get_global_default_context(); + auto subscriber = std::make_shared>( + callback, + allocator, + context, + "topic", + rclcpp::SystemDefaultsQoS().keep_last(10), + rclcpp::IntraProcessBufferType::SharedPtr); + + EXPECT_NE(nullptr, subscriber); + EXPECT_EQ(false, subscriber->is_serialized()); + + // generate a serialized message + MessageT msg; + rclcpp::SerializedMessage serialized_message; + rclcpp::Serialization serializer; + serializer.serialize_message(&msg, &serialized_message); + + // test deserialization + auto deserialized_msg = subscriber->deserialize_message(&serialized_message); + EXPECT_EQ(msg, *deserialized_msg); + + // Get the intra process manager instance for this context. + auto ipm = context->get_sub_context(); + const auto sub_id = ipm->add_subscription(subscriber); + auto publisher = node->create_publisher("topic", 42); + const auto intra_process_publisher_id = ipm->add_publisher(publisher); + + std::vector take_ownership_subscriptions; + std::vector take_shared_subscriptions = {sub_id}; + subscriber->serve_serialized_message( + &serialized_message, + ipm.get(), + intra_process_publisher_id, + reinterpret_cast(allocator.get()), + take_ownership_subscriptions, + take_shared_subscriptions); + + // execute subscriber callback + { + auto data = subscriber->take_data(); + subscriber->execute(data); + } + + // check if message was received + EXPECT_EQ(1u, counter); + + // add a 2nd shared subscriber to cover more code + auto subscriber2 = std::make_shared>( + callback, + allocator, + context, + "topic", + rclcpp::SystemDefaultsQoS().keep_last(10), + rclcpp::IntraProcessBufferType::SharedPtr); + const auto sub2_id = ipm->add_subscription(subscriber2); + take_shared_subscriptions.push_back(sub2_id); + + subscriber->serve_serialized_message( + &serialized_message, + ipm.get(), + intra_process_publisher_id, + reinterpret_cast(allocator.get()), + take_ownership_subscriptions, + take_shared_subscriptions); + + // execute subscribers callback + { + auto data = subscriber->take_data(); + subscriber->execute(data); + } + + // check if message was received + EXPECT_EQ(2u, counter); + + { + auto data = subscriber2->take_data(); + subscriber2->execute(data); + } + + // check if message was received + EXPECT_EQ(3u, counter); +} + +/* + Test methods of SubscriptionIntraProcess with unique callback: + * is_serialized + * deserialize_message + * serve_serialized_message + */ +TEST_F(TestIntraProcessSubscriber, methods_unique) { + using MessageT = test_msgs::msg::Empty; + using Alloc = std::allocator; + + // create IPM subscriber + uint32_t counter = 0u; + rclcpp::AnySubscriptionCallback callback; + callback.set( + [&counter](std::unique_ptr) { + ++counter; + }); + + auto allocator = std::make_shared(); + auto context = rclcpp::contexts::get_global_default_context(); + auto subscriber = std::make_shared>( + callback, + allocator, + context, + "topic", + rclcpp::SystemDefaultsQoS().keep_last(10), + rclcpp::IntraProcessBufferType::UniquePtr); + + EXPECT_NE(nullptr, subscriber); + EXPECT_EQ(false, subscriber->is_serialized()); + + // generate a serialized message + MessageT msg; + rclcpp::SerializedMessage serialized_message; + rclcpp::Serialization serializer; + serializer.serialize_message(&msg, &serialized_message); + + // test deserialization + auto deserialized_msg = subscriber->deserialize_message(&serialized_message); + EXPECT_EQ(msg, *deserialized_msg); + + // Get the intra process manager instance for this context. + auto ipm = context->get_sub_context(); + const auto sub_id = ipm->add_subscription(subscriber); + auto publisher = node->create_publisher("topic", 42); + const auto intra_process_publisher_id = ipm->add_publisher(publisher); + + std::vector take_ownership_subscriptions = {sub_id}; + std::vector take_shared_subscriptions; + subscriber->serve_serialized_message( + &serialized_message, + ipm.get(), + intra_process_publisher_id, + reinterpret_cast(allocator.get()), + take_ownership_subscriptions, + take_shared_subscriptions); + + // execute subscriber callback + { + auto data = subscriber->take_data(); + subscriber->execute(data); + } + + // check if message was received + EXPECT_EQ(1u, counter); +} diff --git a/rclcpp/test/rclcpp/test_publisher.cpp b/rclcpp/test/rclcpp/test_publisher.cpp index 3edb24c0dd..55b0ffc893 100644 --- a/rclcpp/test/rclcpp/test_publisher.cpp +++ b/rclcpp/test/rclcpp/test_publisher.cpp @@ -312,6 +312,23 @@ TEST_F(TestPublisher, serialized_message_publish) { EXPECT_NO_THROW(publisher->publish(serialized_msg.get_rcl_serialized_message())); } +TEST_F(TestPublisher, serialized_message_intra_process_publish) { + initialize(); + rclcpp::PublisherOptionsWithAllocator> options; + options.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + auto publisher = rclcpp::detail::create_publisher( + *node, "topic", + rclcpp::get_message_type_support_handle(), 10, options); + + rclcpp::SerializedMessage serialized_msg; + RCLCPP_EXPECT_THROW_EQ( + publisher->publish(serialized_msg), + std::runtime_error("storing serialized messages in intra process is not supported yet")); + + auto serialized_unique_msg_ptr = std::make_unique(); + EXPECT_NO_THROW(publisher->publish(std::move(serialized_unique_msg_ptr))); +} + TEST_F(TestPublisher, rcl_publisher_init_error) { initialize(); auto mock = mocking_utils::patch_and_return("lib:rclcpp", rcl_publisher_init, RCL_RET_ERROR); diff --git a/rclcpp/test/rclcpp/test_subscription.cpp b/rclcpp/test/rclcpp/test_subscription.cpp index 867171848c..5263531551 100644 --- a/rclcpp/test/rclcpp/test_subscription.cpp +++ b/rclcpp/test/rclcpp/test_subscription.cpp @@ -672,3 +672,73 @@ TEST_P(TestSubscriptionInvalidIntraprocessQos, test_subscription_throws_intrapro options);}, std::runtime_error("Unrecognized IntraProcessSetting value")); } + +/* + Testing on_new_intra_process_message callbacks. + */ +TEST_F(TestSubscription, on_new_intra_process_serialized_message_callback) { + initialize(rclcpp::NodeOptions().use_intra_process_comms(true)); + + auto do_nothing = [](std::shared_ptr) {FAIL();}; + auto sub = + node_->create_generic_subscription("~/test_take", "test_msgs/msg/Empty", 10, do_nothing); + + std::atomic c1 {0}; + auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; + sub->set_on_new_intra_process_message_callback(increase_c1_cb); + + auto pub = node_->create_publisher("~/test_take", 1); + { + test_msgs::msg::Empty msg; + pub->publish(msg); + } + + auto start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c1 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + + std::atomic c2 {0}; + auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;}; + sub->set_on_new_intra_process_message_callback(increase_c2_cb); + + { + test_msgs::msg::Empty msg; + pub->publish(msg); + } + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c2 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + sub->clear_on_new_intra_process_message_callback(); + + { + test_msgs::msg::Empty msg; + pub->publish(msg); + pub->publish(msg); + pub->publish(msg); + } + + std::atomic c3 {0}; + auto increase_c3_cb = [&c3](size_t count_msgs) {c3 += count_msgs;}; + sub->set_on_new_intra_process_message_callback(increase_c3_cb); + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c3 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + EXPECT_EQ(c3.load(), 3u); + + std::function invalid_cb = nullptr; + EXPECT_THROW(sub->set_on_new_intra_process_message_callback(invalid_cb), std::invalid_argument); +}