Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,40 @@ class PublisherManager
publisher_nodes_.clear();
}

/**
* Spin up a Node and publish message to topic_name message_count times at publish_rate.
* The Node's scope is tied to the scope of this method.
* The message may publish less than message_count times if rclcpp encountered an error.
*
* \tparam T the type of message to send.
* \param topic_name is the name of the topic to publish to.
* \param message is the message to publish.
* \param publish_rate is the rate to publish the message
* \param message_count is the number of times to publish the message.
*/
template<class T>
void run_scoped_publisher(
const std::string & topic_name,
const std::shared_ptr<T> message,
const std::chrono::milliseconds publish_rate,
const int message_count)
{
std::stringstream node_name;
node_name << "publisher" << counter_++;

auto publisher_node = std::make_shared<rclcpp::Node>(
node_name.str(),
rclcpp::NodeOptions().start_parameter_event_publisher(false));

auto publisher = publisher_node->create_publisher<T>(topic_name, 10);
rclcpp::WallRate rate{publish_rate};

for (int i = 0; rclcpp::ok() && i < message_count; ++i) {
publisher->publish(*message);
rate.sleep();
}
}

template<class T>
void add_publisher(
const std::string & topic_name, std::shared_ptr<T> message, size_t expected_messages = 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,46 @@

#include <gmock/gmock.h>

#include <memory>
#include <string>
#include <unordered_set>

#include "rclcpp/rclcpp.hpp"
#include "rcpputils/filesystem_helper.hpp"
#include "rcutils/filesystem.h"
#include "rosbag2_storage/metadata_io.hpp"

#include "rosbag2_test_common/process_execution_helpers.hpp"

#include "record_fixture.hpp"

namespace
{
/**
* Construct an instance of test_msgs::msg::Strings populated with the base_message repeated until it fits the requested size.
*
* \param base_message is the String to repeat.
* \param max_message_size_bytes is the size of the message in bytes.
* \return an instance of test_msgs::msg::Strings that is the requested size.
*/
std::shared_ptr<test_msgs::msg::Strings> create_string_message(
const std::string & base_message,
const int max_message_size_bytes)
{
const auto base_message_size = base_message.size() * sizeof(std::string::value_type);
const auto iterations = max_message_size_bytes / static_cast<int>(base_message_size);

std::stringstream message_str;
for (int i = 0; i < iterations; ++i) {
message_str << base_message;
}

auto message = get_messages_strings()[0];
message->string_value = message_str.str();

return message;
}
} // namespace

TEST_F(RecordFixture, record_end_to_end_test) {
auto message = get_messages_strings()[0];
message->string_value = "test";
Expand Down Expand Up @@ -70,6 +102,250 @@ TEST_F(RecordFixture, record_end_to_end_test) {
EXPECT_THAT(wrong_topic_messages, IsEmpty());
}

// TODO(zmichaels11): Fix and enable this test on Windows.
// This tests depends on the ability to read the metadata file.
// Stopping the process on Windows does a hard kill and the metadata file is not written.
#ifndef _WIN32
TEST_F(RecordFixture, record_end_to_end_with_splitting_metadata_contains_all_topics) {
constexpr const int bagfile_split_size = 4 * 1024 * 1024; // 4MB.
std::stringstream command;
command << "ros2 bag record" <<
" --output " << root_bag_path_ <<
" --max-bag-size " << bagfile_split_size <<
" -a";
auto process_handle = start_execution(command.str());
wait_for_db();

constexpr const char first_topic_name[] = "/test_topic0";
constexpr const char second_topic_name[] = "/test_topic1";
constexpr const int expected_splits = 4;
{
constexpr const char message_str[] = "Test";
constexpr const int message_size = 1024 * 1024; // 1MB
const auto message = create_string_message(message_str, message_size);
constexpr const int message_count = bagfile_split_size * expected_splits / message_size;
constexpr const int message_batch_size = message_count / 2;

pub_man_.run_scoped_publisher(
first_topic_name,
message,
50ms,
message_batch_size);

pub_man_.run_scoped_publisher(
second_topic_name,
message,
50ms,
message_batch_size);
}

stop_execution(process_handle);

rosbag2_storage::MetadataIo metadataIo;
const auto metadata = metadataIo.read_metadata(root_bag_path_);
// Verify at least 2 topics are in the metadata.
// There may be more if the test system is noisy.
EXPECT_GT(metadata.topics_with_message_count.size(), 1u);

// Transform the topic_names_with_message_count into an unordered set of topic_names.
std::unordered_set<std::string> topic_names;
for (const auto & topic : metadata.topics_with_message_count) {
topic_names.insert(topic.topic_metadata.name);
}

// Verify that first_topic_name and second_topic_name are both in the metadata.
EXPECT_NE(topic_names.end(), topic_names.find(first_topic_name));
EXPECT_NE(topic_names.end(), topic_names.find(second_topic_name));
}
#endif

TEST_F(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least_specified_size) {
constexpr const char topic_name[] = "/test_topic";
constexpr const int bagfile_split_size = 4 * 1024 * 1024; // 4MB.
std::stringstream command;
command << "ros2 bag record " <<
" --output " << root_bag_path_ <<
" --max-bag-size " << bagfile_split_size <<
" " << topic_name;
auto process_handle = start_execution(command.str());
wait_for_db();

constexpr const int expected_splits = 4;
{
constexpr const char message_str[] = "Test";
constexpr const int message_size = 512 * 1024; // 512KB
const auto message = create_string_message(message_str, message_size);
constexpr const int message_count = bagfile_split_size * expected_splits / message_size;

pub_man_.run_scoped_publisher(
topic_name,
message,
50ms,
message_count);
}

stop_execution(process_handle);

rosbag2_storage::MetadataIo metadata_io;

#ifdef _WIN32
{
rosbag2_storage::BagMetadata metadata;
metadata.version = 2;
metadata.storage_identifier = "sqlite3";

// Loop until expected_splits in case it split or the bagfile doesn't exist.
for (int i = 0; i < expected_splits; ++i) {
std::stringstream bagfile_name;
bagfile_name << "bag_" << i << ".db3";

const auto bagfile_path =
(rcpputils::fs::path(root_bag_path_) / bagfile_name.str());

if (bagfile_path.exists()) {
metadata.relative_file_paths.push_back(bagfile_path.string());
} else {
break;
}
}

metadata_io.write_metadata(root_bag_path_, metadata);
}
#endif

const auto metadata = metadata_io.read_metadata(root_bag_path_);
const auto actual_splits = static_cast<int>(metadata.relative_file_paths.size());

// TODO(zmichaels11): Support reliable sync-to-disk for more accurate splits.
// The only guarantee with splits right now is that they will not occur until
// a bagfile is at least the specified max_bagfile_size.
EXPECT_GT(actual_splits, 0);

// Don't include the last bagfile since it won't be full
for (int i = 0; i < actual_splits - 1; ++i) {
const auto bagfile_path = metadata.relative_file_paths[i];
EXPECT_TRUE(rcpputils::fs::exists(bagfile_path));

const auto actual_split_size = static_cast<int>(rcutils_get_file_size(bagfile_path.c_str()));
// Actual size is guaranteed to be >= bagfile_split size
EXPECT_LT(bagfile_split_size, actual_split_size);
}
}

TEST_F(RecordFixture, record_end_to_end_with_splitting_max_size_not_reached) {
constexpr const char topic_name[] = "/test_topic";
constexpr const int bagfile_split_size = 4 * 1024 * 1024; // 4MB.
std::stringstream command;
command << "ros2 bag record " <<
" --output " << root_bag_path_ <<
" --max-bag-size " << bagfile_split_size <<
" " << topic_name;
auto process_handle = start_execution(command.str());
wait_for_db();

{
constexpr const int message_size = 512 * 1024; // 512KB
constexpr const char message_str[] = "Test";
const auto message = create_string_message(message_str, message_size);
// only fill the bagfile halfway
constexpr const int message_count = bagfile_split_size / message_size / 2;

pub_man_.run_scoped_publisher(
topic_name,
message,
50ms,
message_count);
}

stop_execution(process_handle);

rosbag2_storage::MetadataIo metadata_io;

// TODO(zmichaels11): Remove when stop_execution properly SIGINT on Windows.
// This is required since stop_execution hard kills the proces on Windows,
// which prevents the metadata from being written.
#ifdef _WIN32
{
rosbag2_storage::BagMetadata metadata;
metadata.version = 2;
metadata.storage_identifier = "sqlite3";

const auto bag_path = rcpputils::fs::path(root_bag_path_) / "bag_0.db3";

metadata.relative_file_paths = {bag_path.string()};
metadata_io.write_metadata(root_bag_path_, metadata);
}
#endif

const auto metadata = metadata_io.read_metadata(root_bag_path_);

// Check that there's only 1 bagfile and that it exists.
EXPECT_EQ(1u, metadata.relative_file_paths.size());
EXPECT_TRUE(rcpputils::fs::exists(metadata.relative_file_paths[0]));

// Check that the next bagfile does not exist.
EXPECT_FALSE((rcpputils::fs::path(root_bag_path_) / "bag_1.db3").exists());
}

TEST_F(RecordFixture, record_end_to_end_with_splitting_splits_bagfile) {
constexpr const char topic_name[] = "/test_topic";
constexpr const int bagfile_split_size = 4 * 1024 * 1024; // 4MB.

std::stringstream command;
command << "ros2 bag record" <<
" --output " << root_bag_path_ <<
" --max-bag-size " << bagfile_split_size <<
" " << topic_name;
auto process_handle = start_execution(command.str());
wait_for_db();

constexpr const int expected_splits = 4;
{
constexpr const char message_str[] = "Test";
constexpr const int message_size = 1024 * 1024; // 1MB
// string message from test_msgs
const auto message = create_string_message(message_str, message_size);
constexpr const int message_count = bagfile_split_size * expected_splits / message_size;

pub_man_.run_scoped_publisher(
topic_name,
message,
50ms,
message_count);
}

stop_execution(process_handle);

rosbag2_storage::MetadataIo metadata_io;

// TODO(zmichaels11): Remove when stop_execution properly SIGINT on Windows.
// This is required since stop_execution hard kills the proces on Windows,
// which prevents the metadata from being written.
#ifdef _WIN32
{
rosbag2_storage::BagMetadata metadata;
metadata.version = 2;
metadata.storage_identifier = "sqlite3";

for (int i = 0; i < expected_splits; ++i) {
std::stringstream bag_name;
bag_name << "bag_" << i << ".db3";

const auto bag_path = rcpputils::fs::path(root_bag_path_) / bag_name.str();

metadata.relative_file_paths.push_back(bag_path.string());
}
metadata_io.write_metadata(root_bag_path_, metadata);
}
#endif

const auto metadata = metadata_io.read_metadata(root_bag_path_);

for (const auto & path : metadata.relative_file_paths) {
EXPECT_TRUE(rcpputils::fs::exists(path));
}
}

TEST_F(RecordFixture, record_fails_gracefully_if_bag_already_exists) {
auto database_path = _SRC_RESOURCES_DIR_PATH; // variable defined in CMakeLists.txt

Expand Down