diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp index d679e402e5..37f0bd398b 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp @@ -55,6 +55,7 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn std::shared_ptr read_next() override { + // filter_ was considered when incrementing num_read_ in has_next() return messages_[num_read_++]; } diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index f4e57cb577..74bcdac77c 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -197,78 +197,98 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) serialize_test_message("topic2", 750, complex_message1), serialize_test_message("topic2", 950, complex_message1)}; - auto prepared_mock_reader = std::make_unique(); - prepared_mock_reader->prepare(messages, topic_types); - reader_ = std::make_unique(std::move(prepared_mock_reader)); + // Filter allows /topic2, blocks /topic1 + { + play_options_.topics_to_filter = {"topic2"}; - // Set filter - rosbag2_storage::StorageFilter storage_filter; - storage_filter.topics.push_back("topic2"); - reader_->set_filter(storage_filter); + // SubscriptionManager has to be recreated for every unique test + // If it isn't, message counts accumulate + sub_.reset(); + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", 0); + sub_->add_subscription("/topic2", 2); - // Due to a problem related to the subscriber, we play many (3) messages but make the subscriber - // node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument. - sub_->add_subscription("/topic1", 2); - sub_->add_subscription("/topic2", 2); + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + reader_ = std::make_unique(std::move(prepared_mock_reader)); - auto await_received_messages = sub_->spin_subscriptions(); + auto await_received_messages = sub_->spin_subscriptions(); - Rosbag2Transport rosbag2_transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + Rosbag2Transport rosbag2_transport(reader_, writer_); + rosbag2_transport.play(storage_options_, play_options_); - await_received_messages.get(); + await_received_messages.get(); - auto replayed_test_primitives = sub_->get_received_messages( - "/topic1"); - EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(0u))); + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + // No messages are allowed to have arrived + EXPECT_THAT(replayed_topic1, SizeIs(0u)); - auto replayed_test_arrays = sub_->get_received_messages( - "/topic2"); - EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u))); + auto replayed_topic2 = sub_->get_received_messages("/topic2"); + // All we care is that any messages arrived + EXPECT_THAT(replayed_topic2, SizeIs(Ge(1u))); + } - // Set new filter - auto prepared_mock_reader2 = std::make_unique(); - reader_.reset(); - reader_ = std::make_unique(std::move(prepared_mock_reader2)); - storage_filter.topics.clear(); - storage_filter.topics.push_back("topic1"); - reader_->set_filter(storage_filter); + // Filter allows /topic1, blocks /topic2 + { + play_options_.topics_to_filter = {"topic1"}; - await_received_messages = sub_->spin_subscriptions(); + // SubscriptionManager has to be recreated for every unique test + // otherwise counts accumulate, returning the spin immediately + sub_.reset(); + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", 2); + sub_->add_subscription("/topic2", 0); - rosbag2_transport = Rosbag2Transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + reader_ = std::make_unique(std::move(prepared_mock_reader)); - await_received_messages.get(); + auto await_received_messages = sub_->spin_subscriptions(); - replayed_test_primitives = sub_->get_received_messages( - "/topic1"); - EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u))); + Rosbag2Transport rosbag2_transport(reader_, writer_); + rosbag2_transport.play(storage_options_, play_options_); - replayed_test_arrays = sub_->get_received_messages( - "/topic2"); - EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(0u))); + await_received_messages.get(); - // Reset filter - auto prepared_mock_reader3 = std::make_unique(); - reader_.reset(); - reader_ = std::make_unique(std::move(prepared_mock_reader3)); - reader_->reset_filter(); + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + // All we care is that any messages arrived + EXPECT_THAT(replayed_topic1, SizeIs(Ge(1u))); - await_received_messages = sub_->spin_subscriptions(); + auto replayed_topic2 = sub_->get_received_messages("/topic2"); + // No messages are allowed to have arrived + EXPECT_THAT(replayed_topic2, SizeIs(0u)); + } - rosbag2_transport = Rosbag2Transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + // No filter, receive both topics + { + play_options_.topics_to_filter.clear(); - await_received_messages.get(); + // SubscriptionManager has to be recreated for every unique test + // otherwise counts accumulate, returning the spin immediately + sub_.reset(); + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", 2); + sub_->add_subscription("/topic2", 2); - replayed_test_primitives = sub_->get_received_messages( - "/topic1"); - EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u))); + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + reader_ = std::make_unique(std::move(prepared_mock_reader)); - replayed_test_arrays = sub_->get_received_messages( - "/topic2"); - EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u))); + auto await_received_messages = sub_->spin_subscriptions(); + + auto rosbag2_transport = Rosbag2Transport(reader_, writer_); + rosbag2_transport.play(storage_options_, play_options_); + + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + // All we care is that any messages arrived + EXPECT_THAT(replayed_topic1, SizeIs(Ge(1u))); + + auto replayed_topic2 = sub_->get_received_messages("/topic2"); + // All we care is that any messages arrived + EXPECT_THAT(replayed_topic2, SizeIs(Ge(1u))); + } } TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_with_unknown_type)