diff --git a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp index 33a262199d..46684ac7ad 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp @@ -79,8 +79,9 @@ class SubscriptionManager { return async( std::launch::async, [this]() { - while (continue_spinning(expected_topics_with_size_)) { - rclcpp::spin_some(subscriber_node_); + rclcpp::executors::SingleThreadedExecutor exec; + while (rclcpp::ok() && continue_spinning(expected_topics_with_size_)) { + exec.spin_node_some(subscriber_node_); } }); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 74bcdac77c..9ec5e51837 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -446,6 +446,7 @@ class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture } else { EXPECT_NE(result, std::future_status::timeout); } + // Have to rclcpp::shutdown here to make the spin_subscriptions async thread exit transport.shutdown(); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 81bbba4db0..78f77d60a6 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -151,9 +151,14 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) // Publish messages before starting recording test_msgs::msg::Strings msg; msg.string_value = "Hello"; - for (size_t i = 0; i < num_latched_messages; i++) { - publisher_transient_local->publish(msg); - rclcpp::spin_some(publisher_node); + { + // Scope the executor so that the node is removed from it before `spin_and_wait_for` + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(publisher_node); + for (size_t i = 0; i < num_latched_messages; i++) { + publisher_transient_local->publish(msg); + exec.spin_some(); + } } start_recording({false, false, {topic}, "rmw_format", 100ms}); auto & writer = static_cast(writer_->get_implementation_handle()); diff --git a/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp b/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp index 3efd326e76..3734bac674 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp @@ -50,6 +50,7 @@ class RosBag2NodeFixture : public Test publisher_node_ = std::make_shared( "publisher_node", rclcpp::NodeOptions().start_parameter_event_publisher(false)); + executor_.add_node(node_); } static void SetUpTestCase() @@ -84,7 +85,7 @@ class RosBag2NodeFixture : public Test }); while (counter < expected_messages_number) { - rclcpp::spin_some(node_); + executor_.spin_some(); } return messages; } @@ -111,13 +112,14 @@ class RosBag2NodeFixture : public Test if ((clock::now() - start) > timeout) { return false; } - rclcpp::spin_some(node_); + executor_.spin_some(); } return true; } MemoryManagement memory_management_; std::shared_ptr node_; + rclcpp::executors::SingleThreadedExecutor executor_; rclcpp::Node::SharedPtr publisher_node_; std::vector> publishers_; };