diff --git a/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp index 84bf73ca7..4e4747989 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp @@ -39,6 +39,40 @@ class PublisherManager publisher_nodes_.clear(); } + /** + * Spin up a Node and publish message to topic_name message_count times at publish_rate. + * The Node's scope is tied to the scope of this method. + * The message may publish less than message_count times if rclcpp encountered an error. + * + * \tparam T the type of message to send. + * \param topic_name is the name of the topic to publish to. + * \param message is the message to publish. + * \param publish_rate is the rate to publish the message + * \param message_count is the number of times to publish the message. + */ + template + void run_scoped_publisher( + const std::string & topic_name, + const std::shared_ptr message, + const std::chrono::milliseconds publish_rate, + const int message_count) + { + std::stringstream node_name; + node_name << "publisher" << counter_++; + + auto publisher_node = std::make_shared( + node_name.str(), + rclcpp::NodeOptions().start_parameter_event_publisher(false)); + + auto publisher = publisher_node->create_publisher(topic_name, 10); + rclcpp::WallRate rate{publish_rate}; + + for (int i = 0; rclcpp::ok() && i < message_count; ++i) { + publisher->publish(*message); + rate.sleep(); + } + } + template void add_publisher( const std::string & topic_name, std::shared_ptr message, size_t expected_messages = 0) diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp index 019677fd9..583661005 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp @@ -14,14 +14,46 @@ #include +#include #include +#include +#include "rclcpp/rclcpp.hpp" +#include "rcpputils/filesystem_helper.hpp" +#include "rcutils/filesystem.h" #include "rosbag2_storage/metadata_io.hpp" - #include "rosbag2_test_common/process_execution_helpers.hpp" #include "record_fixture.hpp" +namespace +{ +/** + * Construct an instance of test_msgs::msg::Strings populated with the base_message repeated until it fits the requested size. + * + * \param base_message is the String to repeat. + * \param max_message_size_bytes is the size of the message in bytes. + * \return an instance of test_msgs::msg::Strings that is the requested size. + */ +std::shared_ptr create_string_message( + const std::string & base_message, + const int max_message_size_bytes) +{ + const auto base_message_size = base_message.size() * sizeof(std::string::value_type); + const auto iterations = max_message_size_bytes / static_cast(base_message_size); + + std::stringstream message_str; + for (int i = 0; i < iterations; ++i) { + message_str << base_message; + } + + auto message = get_messages_strings()[0]; + message->string_value = message_str.str(); + + return message; +} +} // namespace + TEST_F(RecordFixture, record_end_to_end_test) { auto message = get_messages_strings()[0]; message->string_value = "test"; @@ -70,6 +102,250 @@ TEST_F(RecordFixture, record_end_to_end_test) { EXPECT_THAT(wrong_topic_messages, IsEmpty()); } +// TODO(zmichaels11): Fix and enable this test on Windows. +// This tests depends on the ability to read the metadata file. +// Stopping the process on Windows does a hard kill and the metadata file is not written. +#ifndef _WIN32 +TEST_F(RecordFixture, record_end_to_end_with_splitting_metadata_contains_all_topics) { + constexpr const int bagfile_split_size = 4 * 1024 * 1024; // 4MB. + std::stringstream command; + command << "ros2 bag record" << + " --output " << root_bag_path_ << + " --max-bag-size " << bagfile_split_size << + " -a"; + auto process_handle = start_execution(command.str()); + wait_for_db(); + + constexpr const char first_topic_name[] = "/test_topic0"; + constexpr const char second_topic_name[] = "/test_topic1"; + constexpr const int expected_splits = 4; + { + constexpr const char message_str[] = "Test"; + constexpr const int message_size = 1024 * 1024; // 1MB + const auto message = create_string_message(message_str, message_size); + constexpr const int message_count = bagfile_split_size * expected_splits / message_size; + constexpr const int message_batch_size = message_count / 2; + + pub_man_.run_scoped_publisher( + first_topic_name, + message, + 50ms, + message_batch_size); + + pub_man_.run_scoped_publisher( + second_topic_name, + message, + 50ms, + message_batch_size); + } + + stop_execution(process_handle); + + rosbag2_storage::MetadataIo metadataIo; + const auto metadata = metadataIo.read_metadata(root_bag_path_); + // Verify at least 2 topics are in the metadata. + // There may be more if the test system is noisy. + EXPECT_GT(metadata.topics_with_message_count.size(), 1u); + + // Transform the topic_names_with_message_count into an unordered set of topic_names. + std::unordered_set topic_names; + for (const auto & topic : metadata.topics_with_message_count) { + topic_names.insert(topic.topic_metadata.name); + } + + // Verify that first_topic_name and second_topic_name are both in the metadata. + EXPECT_NE(topic_names.end(), topic_names.find(first_topic_name)); + EXPECT_NE(topic_names.end(), topic_names.find(second_topic_name)); +} +#endif + +TEST_F(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least_specified_size) { + constexpr const char topic_name[] = "/test_topic"; + constexpr const int bagfile_split_size = 4 * 1024 * 1024; // 4MB. + std::stringstream command; + command << "ros2 bag record " << + " --output " << root_bag_path_ << + " --max-bag-size " << bagfile_split_size << + " " << topic_name; + auto process_handle = start_execution(command.str()); + wait_for_db(); + + constexpr const int expected_splits = 4; + { + constexpr const char message_str[] = "Test"; + constexpr const int message_size = 512 * 1024; // 512KB + const auto message = create_string_message(message_str, message_size); + constexpr const int message_count = bagfile_split_size * expected_splits / message_size; + + pub_man_.run_scoped_publisher( + topic_name, + message, + 50ms, + message_count); + } + + stop_execution(process_handle); + + rosbag2_storage::MetadataIo metadata_io; + +#ifdef _WIN32 + { + rosbag2_storage::BagMetadata metadata; + metadata.version = 2; + metadata.storage_identifier = "sqlite3"; + + // Loop until expected_splits in case it split or the bagfile doesn't exist. + for (int i = 0; i < expected_splits; ++i) { + std::stringstream bagfile_name; + bagfile_name << "bag_" << i << ".db3"; + + const auto bagfile_path = + (rcpputils::fs::path(root_bag_path_) / bagfile_name.str()); + + if (bagfile_path.exists()) { + metadata.relative_file_paths.push_back(bagfile_path.string()); + } else { + break; + } + } + + metadata_io.write_metadata(root_bag_path_, metadata); + } +#endif + + const auto metadata = metadata_io.read_metadata(root_bag_path_); + const auto actual_splits = static_cast(metadata.relative_file_paths.size()); + + // TODO(zmichaels11): Support reliable sync-to-disk for more accurate splits. + // The only guarantee with splits right now is that they will not occur until + // a bagfile is at least the specified max_bagfile_size. + EXPECT_GT(actual_splits, 0); + + // Don't include the last bagfile since it won't be full + for (int i = 0; i < actual_splits - 1; ++i) { + const auto bagfile_path = metadata.relative_file_paths[i]; + EXPECT_TRUE(rcpputils::fs::exists(bagfile_path)); + + const auto actual_split_size = static_cast(rcutils_get_file_size(bagfile_path.c_str())); + // Actual size is guaranteed to be >= bagfile_split size + EXPECT_LT(bagfile_split_size, actual_split_size); + } +} + +TEST_F(RecordFixture, record_end_to_end_with_splitting_max_size_not_reached) { + constexpr const char topic_name[] = "/test_topic"; + constexpr const int bagfile_split_size = 4 * 1024 * 1024; // 4MB. + std::stringstream command; + command << "ros2 bag record " << + " --output " << root_bag_path_ << + " --max-bag-size " << bagfile_split_size << + " " << topic_name; + auto process_handle = start_execution(command.str()); + wait_for_db(); + + { + constexpr const int message_size = 512 * 1024; // 512KB + constexpr const char message_str[] = "Test"; + const auto message = create_string_message(message_str, message_size); + // only fill the bagfile halfway + constexpr const int message_count = bagfile_split_size / message_size / 2; + + pub_man_.run_scoped_publisher( + topic_name, + message, + 50ms, + message_count); + } + + stop_execution(process_handle); + + rosbag2_storage::MetadataIo metadata_io; + +// TODO(zmichaels11): Remove when stop_execution properly SIGINT on Windows. +// This is required since stop_execution hard kills the proces on Windows, +// which prevents the metadata from being written. +#ifdef _WIN32 + { + rosbag2_storage::BagMetadata metadata; + metadata.version = 2; + metadata.storage_identifier = "sqlite3"; + + const auto bag_path = rcpputils::fs::path(root_bag_path_) / "bag_0.db3"; + + metadata.relative_file_paths = {bag_path.string()}; + metadata_io.write_metadata(root_bag_path_, metadata); + } +#endif + + const auto metadata = metadata_io.read_metadata(root_bag_path_); + + // Check that there's only 1 bagfile and that it exists. + EXPECT_EQ(1u, metadata.relative_file_paths.size()); + EXPECT_TRUE(rcpputils::fs::exists(metadata.relative_file_paths[0])); + + // Check that the next bagfile does not exist. + EXPECT_FALSE((rcpputils::fs::path(root_bag_path_) / "bag_1.db3").exists()); +} + +TEST_F(RecordFixture, record_end_to_end_with_splitting_splits_bagfile) { + constexpr const char topic_name[] = "/test_topic"; + constexpr const int bagfile_split_size = 4 * 1024 * 1024; // 4MB. + + std::stringstream command; + command << "ros2 bag record" << + " --output " << root_bag_path_ << + " --max-bag-size " << bagfile_split_size << + " " << topic_name; + auto process_handle = start_execution(command.str()); + wait_for_db(); + + constexpr const int expected_splits = 4; + { + constexpr const char message_str[] = "Test"; + constexpr const int message_size = 1024 * 1024; // 1MB + // string message from test_msgs + const auto message = create_string_message(message_str, message_size); + constexpr const int message_count = bagfile_split_size * expected_splits / message_size; + + pub_man_.run_scoped_publisher( + topic_name, + message, + 50ms, + message_count); + } + + stop_execution(process_handle); + + rosbag2_storage::MetadataIo metadata_io; + +// TODO(zmichaels11): Remove when stop_execution properly SIGINT on Windows. +// This is required since stop_execution hard kills the proces on Windows, +// which prevents the metadata from being written. +#ifdef _WIN32 + { + rosbag2_storage::BagMetadata metadata; + metadata.version = 2; + metadata.storage_identifier = "sqlite3"; + + for (int i = 0; i < expected_splits; ++i) { + std::stringstream bag_name; + bag_name << "bag_" << i << ".db3"; + + const auto bag_path = rcpputils::fs::path(root_bag_path_) / bag_name.str(); + + metadata.relative_file_paths.push_back(bag_path.string()); + } + metadata_io.write_metadata(root_bag_path_, metadata); + } +#endif + + const auto metadata = metadata_io.read_metadata(root_bag_path_); + + for (const auto & path : metadata.relative_file_paths) { + EXPECT_TRUE(rcpputils::fs::exists(path)); + } +} + TEST_F(RecordFixture, record_fails_gracefully_if_bag_already_exists) { auto database_path = _SRC_RESOURCES_DIR_PATH; // variable defined in CMakeLists.txt