Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void SequentialCompressionReader::open(
ROSBAG2_COMPRESSION_LOG_WARN("No topics were listed in metadata.");
return;
}
fill_topics_metadata();

// Currently a bag file can only be played if all topics have the same serialization format.
check_topics_serialization_formats(topics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ class SequentialCompressionReaderTest : public Test
ON_CALL(*storage_factory_, open_read_only(_, _)).WillByDefault(Return(storage_));
}

rosbag2_storage::BagMetadata construct_default_bag_metadata() const
{
rosbag2_storage::BagMetadata metadata;
metadata.relative_file_paths = {"/path/to/storage"};
metadata.topics_with_message_count.push_back({{topic_with_type_}, 1});
metadata.compression_format = "zstd";
metadata.compression_mode =
rosbag2_compression::compression_mode_to_string(rosbag2_compression::CompressionMode::FILE);
return metadata;
}

std::unique_ptr<StrictMock<MockStorageFactory>> storage_factory_;
std::shared_ptr<NiceMock<MockStorage>> storage_;
std::shared_ptr<StrictMock<MockConverterFactory>> converter_factory_;
Expand All @@ -65,12 +76,8 @@ class SequentialCompressionReaderTest : public Test

TEST_F(SequentialCompressionReaderTest, open_throws_if_unsupported_compressor)
{
rosbag2_storage::BagMetadata metadata;
metadata.relative_file_paths = {"/path/to/storage"};
metadata.topics_with_message_count.push_back({{topic_with_type_}, 1});
rosbag2_storage::BagMetadata metadata = construct_default_bag_metadata();
metadata.compression_format = "bad_format";
metadata.compression_mode =
rosbag2_compression::compression_mode_to_string(rosbag2_compression::CompressionMode::FILE);
EXPECT_CALL(*metadata_io_, read_metadata(_)).WillRepeatedly(Return(metadata));
EXPECT_CALL(*metadata_io_, metadata_file_exists(_)).WillRepeatedly(Return(true));
auto compression_factory = std::make_unique<rosbag2_compression::CompressionFactory>();
Expand All @@ -87,14 +94,36 @@ TEST_F(SequentialCompressionReaderTest, open_throws_if_unsupported_compressor)
std::invalid_argument);
}

TEST_F(SequentialCompressionReaderTest, returns_all_topics_and_types)
{
rosbag2_storage::BagMetadata metadata = construct_default_bag_metadata();
ON_CALL(*metadata_io_, read_metadata(_)).WillByDefault(Return(metadata));
ON_CALL(*metadata_io_, metadata_file_exists(_)).WillByDefault(Return(true));

auto decompressor = std::make_unique<NiceMock<MockDecompressor>>();
auto compression_factory = std::make_unique<StrictMock<MockCompressionFactory>>();

ON_CALL(*compression_factory, create_decompressor(_))
.WillByDefault(Return(ByMove(std::move(decompressor))));
EXPECT_CALL(*compression_factory, create_decompressor(_)).Times(1);
EXPECT_CALL(*storage_factory_, open_read_only(_, _)).Times(1);

auto compression_reader = std::make_unique<rosbag2_compression::SequentialCompressionReader>(
std::move(compression_factory),
std::move(storage_factory_),
converter_factory_,
std::move(metadata_io_));

compression_reader->open(
rosbag2_cpp::StorageOptions(), {"", storage_serialization_format_});

auto topics_and_types = compression_reader->get_all_topics_and_types();
EXPECT_FALSE(topics_and_types.empty());
}

TEST_F(SequentialCompressionReaderTest, open_supports_zstd_compressor)
{
rosbag2_storage::BagMetadata metadata;
metadata.relative_file_paths = {"/path/to/storage"};
metadata.topics_with_message_count.push_back({{topic_with_type_}, 1});
metadata.compression_format = "zstd";
metadata.compression_mode =
rosbag2_compression::compression_mode_to_string(rosbag2_compression::CompressionMode::FILE);
rosbag2_storage::BagMetadata metadata = construct_default_bag_metadata();
ON_CALL(*metadata_io_, read_metadata(_)).WillByDefault(Return(metadata));
ON_CALL(*metadata_io_, metadata_file_exists(_)).WillByDefault(Return(true));
auto compression_factory = std::make_unique<rosbag2_compression::CompressionFactory>();
Expand All @@ -114,12 +143,7 @@ TEST_F(SequentialCompressionReaderTest, open_supports_zstd_compressor)

TEST_F(SequentialCompressionReaderTest, reader_calls_create_decompressor)
{
rosbag2_storage::BagMetadata metadata;
metadata.relative_file_paths = {"/path/to/storage"};
metadata.topics_with_message_count.push_back({{topic_with_type_}, 1});
metadata.compression_format = "zstd";
metadata.compression_mode =
rosbag2_compression::compression_mode_to_string(rosbag2_compression::CompressionMode::FILE);
rosbag2_storage::BagMetadata metadata = construct_default_bag_metadata();
ON_CALL(*metadata_io_, read_metadata(_)).WillByDefault(Return(metadata));
ON_CALL(*metadata_io_, metadata_file_exists(_)).WillByDefault(Return(true));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
const std::string & converter_serialization_format,
const std::string & storage_serialization_format);

/**
* Fill topics_metadata_ cache vector with information from metadata_
*/
virtual void fill_topics_metadata();

std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_{};
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInterface> storage_{};
std::unique_ptr<Converter> converter_{};
Expand Down
26 changes: 12 additions & 14 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,6 @@
#include "rosbag2_cpp/logging.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"

namespace
{
void fill_topics_and_types(
const rosbag2_storage::BagMetadata & metadata,
std::vector<rosbag2_storage::TopicMetadata> & topics_and_types)
{
topics_and_types.clear();
topics_and_types.reserve(metadata.topics_with_message_count.size());
for (const auto & topic_information : metadata.topics_with_message_count) {
topics_and_types.push_back(topic_information.topic_metadata);
}
}
} // unnamed namespace

namespace rosbag2_cpp
{
Expand Down Expand Up @@ -133,7 +120,7 @@ void SequentialReader::open(
ROSBAG2_CPP_LOG_WARN("No topics were listed in metadata.");
return;
}
fill_topics_and_types(metadata_, topics_metadata_);
fill_topics_metadata();

// Currently a bag file can only be played if all topics have the same serialization format.
check_topics_serialization_formats(topics);
Expand Down Expand Up @@ -251,5 +238,16 @@ void SequentialReader::check_converter_serialization_format(
}
}
}

void SequentialReader::fill_topics_metadata()
{
rcpputils::check_true(storage_ != nullptr, "Bag is not open. Call open() before reading.");
topics_metadata_.clear();
topics_metadata_.reserve(metadata_.topics_with_message_count.size());
for (const auto & topic_information : metadata_.topics_with_message_count) {
topics_metadata_.push_back(topic_information.topic_metadata);
}
}

} // namespace readers
} // namespace rosbag2_cpp