diff --git a/rosbag2_compression/CMakeLists.txt b/rosbag2_compression/CMakeLists.txt index cc1c44f39..1842bb4b9 100644 --- a/rosbag2_compression/CMakeLists.txt +++ b/rosbag2_compression/CMakeLists.txt @@ -60,7 +60,7 @@ install( ament_export_include_directories(include) ament_export_libraries(${PROJECT_NAME}_zstd) -ament_export_dependencies(rosbag2_storage rcutils zstd_vendor zstd) +ament_export_dependencies(rosbag2_storage rcutils zstd_vendor) if(BUILD_TESTING) find_package(ament_cmake_gmock REQUIRED) diff --git a/rosbag2_cpp/CMakeLists.txt b/rosbag2_cpp/CMakeLists.txt index a63f42053..981e93ee3 100644 --- a/rosbag2_cpp/CMakeLists.txt +++ b/rosbag2_cpp/CMakeLists.txt @@ -34,12 +34,14 @@ find_package(poco_vendor) find_package(Poco COMPONENTS Foundation) find_package(rcpputils REQUIRED) find_package(rcutils REQUIRED) +find_package(rosbag2_compression REQUIRED) find_package(rosbag2_storage REQUIRED) find_package(rosidl_generator_cpp REQUIRED) find_package(rosidl_typesupport_cpp REQUIRED) find_package(rosidl_typesupport_introspection_cpp REQUIRED) add_library(${PROJECT_NAME} SHARED + src/rosbag2_cpp/compression_options.cpp src/rosbag2_cpp/converter.cpp src/rosbag2_cpp/info.cpp src/rosbag2_cpp/reader.cpp @@ -57,6 +59,7 @@ ament_target_dependencies(${PROJECT_NAME} Poco rcpputils rcutils + rosbag2_compression rosbag2_storage rosidl_generator_cpp rosidl_typesupport_introspection_cpp @@ -88,7 +91,7 @@ install( ament_export_include_directories(include) ament_export_libraries(${PROJECT_NAME}) -ament_export_dependencies(pluginlib rosbag2_storage rosidl_typesupport_introspection_cpp) +ament_export_dependencies(pluginlib rosbag2_compression rosbag2_storage rosidl_typesupport_introspection_cpp) if(BUILD_TESTING) find_package(ament_cmake_gmock REQUIRED) @@ -109,6 +112,13 @@ if(BUILD_TESTING) RUNTIME DESTINATION bin) pluginlib_export_plugin_description_file(rosbag2_cpp test/rosbag2_cpp/converter_test_plugin.xml) + ament_add_gmock(test_compression_options + test/rosbag2_cpp/test_compression_options.cpp) + if(TARGET test_compression_options) + target_include_directories(test_compression_options PRIVATE include) + target_link_libraries(test_compression_options ${PROJECT_NAME}) + endif() + ament_add_gmock(test_converter_factory test/rosbag2_cpp/test_converter_factory.cpp) if(TARGET test_converter_factory) @@ -156,6 +166,7 @@ if(BUILD_TESTING) ament_index_cpp Poco rcutils + rosbag2_compression rosbag2_storage rosidl_generator_cpp rosidl_typesupport_introspection_cpp diff --git a/rosbag2_cpp/include/rosbag2_cpp/compression_options.hpp b/rosbag2_cpp/include/rosbag2_cpp/compression_options.hpp new file mode 100644 index 000000000..0f8d223f6 --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/compression_options.hpp @@ -0,0 +1,54 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_ +#define ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_ + +#include + +#include "visibility_control.hpp" + +namespace rosbag2_cpp +{ + +/** + * Modes are used to specify whether to compress by individual serialized bag messages or by file. + * rosbag2_cpp defaults to NONE. + */ +enum class ROSBAG2_CPP_PUBLIC CompressionMode : uint32_t +{ + NONE = 0, + FILE, + MESSAGE, + LAST_MODE = MESSAGE +}; + +/** + * Converts a string into a rosbag2_cpp::CompressionMode enum. + * + * \param compression_mode A case insensitive string that is either "FILE" or "MESSAGE". + * \return CompressionMode NONE if compression_mode is invalid. FILE or MESSAGE otherwise. + */ +ROSBAG2_CPP_PUBLIC CompressionMode compression_mode_from_string(const std::string & compression_mode); + +/** + * Converts a rosbag2_cpp::CompressionMode enum into a string. + * + * \param compression_mode A CompressionMode enum. + * \return The corresponding mode as a string. + */ +ROSBAG2_CPP_PUBLIC std::string compression_mode_to_string(CompressionMode compression_mode); + +} // namespace rosbag2_cpp +#endif // ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp index f6586a97b..8895c02a2 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp @@ -19,12 +19,14 @@ #include #include +#include "rosbag2_cpp/compression_options.hpp" #include "rosbag2_cpp/converter.hpp" #include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp" #include "rosbag2_cpp/serialization_format_converter_factory.hpp" #include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp" #include "rosbag2_cpp/visibility_control.hpp" +#include "rosbag2_compression/base_decompressor_interface.hpp" #include "rosbag2_storage/metadata_io.hpp" #include "rosbag2_storage/storage_factory.hpp" #include "rosbag2_storage/storage_factory_interface.hpp" @@ -121,14 +123,45 @@ class ROSBAG2_CPP_PUBLIC SequentialReader const std::string & converter_serialization_format, const std::string & storage_serialization_format); + /** + * Opens a storage plugin for read only. + * + * \throws std::runtime_error If no storage could be initialized. + */ + virtual void open_storage(); + + /** + * Initializes the decompressor if a compression mode is specified in the metadata. + * + * \throws std::runtime_error If compression format doesn't exist. + */ + virtual void setup_compression(); + std::unique_ptr storage_factory_{}; std::shared_ptr converter_factory_{}; std::shared_ptr storage_{}; std::unique_ptr converter_{}; + std::unique_ptr decompressor_{}; std::unique_ptr metadata_io_{}; rosbag2_storage::BagMetadata metadata_{}; std::vector file_paths_{}; // List of database files. std::vector::iterator current_file_iterator_{}; // Index of file to read from + rosbag2_cpp::CompressionMode compression_mode_{rosbag2_cpp::CompressionMode::NONE}; + +protected: + /** + * Checks if the compression mode is of type MESSAGE and if so, decompresses the message. + * + * \param message A serialized bag message + */ + virtual void decompress_message(rosbag2_storage::SerializedBagMessage * message); + + /** + * Checks if the compression mode is of type FILE and if so, decompresses the file. + * + * \param uri Relative path as a string + */ + virtual void decompress_file(const std::string & uri); }; } // namespace readers diff --git a/rosbag2_cpp/package.xml b/rosbag2_cpp/package.xml index 8aef0f6ba..95055ab69 100644 --- a/rosbag2_cpp/package.xml +++ b/rosbag2_cpp/package.xml @@ -13,6 +13,7 @@ pluginlib poco_vendor rcutils + rosbag2_compression rosbag2_storage rosidl_generator_cpp rosidl_typesupport_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/compression_options.cpp b/rosbag2_cpp/src/rosbag2_cpp/compression_options.cpp new file mode 100644 index 000000000..636640350 --- /dev/null +++ b/rosbag2_cpp/src/rosbag2_cpp/compression_options.cpp @@ -0,0 +1,69 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rosbag2_cpp/compression_options.hpp" +#include "rosbag2_cpp/logging.hpp" + +namespace rosbag2_cpp +{ + +namespace +{ + +constexpr const char kCompressionModeNoneStr[] = "NONE"; +constexpr const char kCompressionModeFileStr[] = "FILE"; +constexpr const char kCompressionModeMessageStr[] = "MESSAGE"; + +std::string to_upper(const std::string & text) +{ + std::string uppercase_text = text; + std::transform(uppercase_text.begin(), uppercase_text.end(), uppercase_text.begin(), ::toupper); + return uppercase_text; +} +} // namespace + +CompressionMode compression_mode_from_string(const std::string & compression_mode) +{ + const auto compression_mode_upper = to_upper(compression_mode); + if (compression_mode.empty() || compression_mode_upper == kCompressionModeNoneStr) { + return CompressionMode::NONE; + } else if (compression_mode_upper == kCompressionModeFileStr) { + return CompressionMode::FILE; + } else if (compression_mode_upper == kCompressionModeMessageStr) { + return CompressionMode::MESSAGE; + } else { + ROSBAG2_CPP_LOG_ERROR_STREAM( + "CompressionMode: \"" << compression_mode << "\" is not supported!"); + return CompressionMode::NONE; + } +} + +std::string compression_mode_to_string(const CompressionMode compression_mode) +{ + switch (compression_mode) { + case CompressionMode::NONE: + return kCompressionModeNoneStr; + case CompressionMode::FILE: + return kCompressionModeFileStr; + case CompressionMode::MESSAGE: + return kCompressionModeMessageStr; + default: + ROSBAG2_CPP_LOG_ERROR_STREAM("CompressionMode not supported!"); + return kCompressionModeNoneStr; + } +} +} // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp index 652ea49a2..25e3a1a8c 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp @@ -19,6 +19,7 @@ #include #include "rcpputils/filesystem_helper.hpp" +#include "rosbag2_compression/zstd_decompressor.hpp" #include "rosbag2_cpp/logging.hpp" #include "rosbag2_cpp/readers/sequential_reader.hpp" @@ -48,6 +49,30 @@ void SequentialReader::reset() storage_.reset(); } +void SequentialReader::open_storage() +{ + storage_ = storage_factory_->open_read_only( + *current_file_iterator_, metadata_.storage_identifier); + if (!storage_) { + throw std::runtime_error{"No storage could be initialized. Abort"}; + } +} + +void SequentialReader::setup_compression() +{ + compression_mode_ = compression_mode_from_string(metadata_.compression_mode); + if (compression_mode_ != rosbag2_cpp::CompressionMode::NONE) { + // TODO(piraka9011): Replace this with a compressor_factory. + if (metadata_.compression_format == "zstd") { + decompressor_ = std::make_unique(); + } else { + std::stringstream err; + err << "Unsupported compression format " << metadata_.compression_format; + throw std::runtime_error(err.str()); + } + } +} + void SequentialReader::open( const StorageOptions & storage_options, const ConverterOptions & converter_options) { @@ -60,21 +85,12 @@ void SequentialReader::open( ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata."); return; } - + open_storage(); + setup_compression(); file_paths_ = metadata_.relative_file_paths; current_file_iterator_ = file_paths_.begin(); - - storage_ = storage_factory_->open_read_only( - *current_file_iterator_, metadata_.storage_identifier); - if (!storage_) { - throw std::runtime_error("No storage could be initialized. Abort"); - } } else { - storage_ = storage_factory_->open_read_only( - storage_options.uri, storage_options.storage_id); - if (!storage_) { - throw std::runtime_error("No storage could be initialized. Abort"); - } + open_storage(); metadata_ = storage_->get_metadata(); if (metadata_.relative_file_paths.empty()) { ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata."); @@ -112,10 +128,19 @@ bool SequentialReader::has_next() throw std::runtime_error("Bag is not open. Call open() before reading."); } +void SequentialReader::decompress_message( + rosbag2_storage::SerializedBagMessage * message) +{ + if (compression_mode_ == CompressionMode::MESSAGE) { + decompressor_->decompress_serialized_bag_message(message); + } +} + std::shared_ptr SequentialReader::read_next() { if (storage_) { auto message = storage_->read_next(); + decompress_message(message.get()); return converter_ ? converter_->convert(message) : message; } throw std::runtime_error("Bag is not open. Call open() before reading."); @@ -134,10 +159,19 @@ bool SequentialReader::has_next_file() const return current_file_iterator_ + 1 != file_paths_.end(); } +void SequentialReader::decompress_file(const std::string & uri) +{ + if (compression_mode_ == CompressionMode::FILE) { + ROSBAG2_CPP_LOG_DEBUG_STREAM("Decompressing " << uri); + *current_file_iterator_ = decompressor_->decompress_uri(uri); + } +} + void SequentialReader::load_next_file() { assert(current_file_iterator_ != file_paths_.end()); current_file_iterator_++; + decompress_file(*current_file_iterator_); } std::string SequentialReader::get_current_file() const diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_compression_options.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_compression_options.cpp new file mode 100644 index 000000000..c48d111db --- /dev/null +++ b/rosbag2_cpp/test/rosbag2_cpp/test_compression_options.cpp @@ -0,0 +1,84 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include "rosbag2_cpp/compression_options.hpp" + +TEST(CompressionOptionsFromStringTest, BadInputReturnsNoneMode) +{ + const std::string compression_mode_string{"bad_mode"}; + const auto compression_mode = rosbag2_cpp::compression_mode_from_string(compression_mode_string); + EXPECT_EQ(compression_mode, rosbag2_cpp::CompressionMode::NONE); +} + +TEST(CompressionOptionsFromStringTest, EmptyInputReturnsNoneMode) +{ + const std::string compression_mode_string; + const auto compression_mode = rosbag2_cpp::compression_mode_from_string(compression_mode_string); + EXPECT_EQ(compression_mode, rosbag2_cpp::CompressionMode::NONE); +} + +TEST(CompressionOptionsFromStringTest, FileStringReturnsFileMode) +{ + const std::string compression_mode_string{"file"}; + const auto compression_mode = rosbag2_cpp::compression_mode_from_string(compression_mode_string); + EXPECT_EQ(compression_mode, rosbag2_cpp::CompressionMode::FILE); +} + +TEST(CompressionOptionsFromStringTest, MixedCaseMessageStringReturnsMessageMode) +{ + const std::string compression_mode_string{"MeSsAgE"}; + const auto compression_mode = rosbag2_cpp::compression_mode_from_string(compression_mode_string); + EXPECT_EQ(compression_mode, rosbag2_cpp::CompressionMode::MESSAGE); +} + +TEST(CompressionOptionsFromStringTest, MessageStringReturnsMessageMode) +{ + const std::string compression_mode_string{"MESSAGE"}; + const auto compression_mode = rosbag2_cpp::compression_mode_from_string(compression_mode_string); + EXPECT_EQ(compression_mode, rosbag2_cpp::CompressionMode::MESSAGE); +} + +TEST(CompressionOptionsToStringTest, BadModeReturnsNoneString) +{ + // Get an out of bounds enum from CompressionMode + const auto compression_mode = static_cast( + static_cast(rosbag2_cpp::CompressionMode::LAST_MODE) + 1); + const auto compression_mode_string = rosbag2_cpp::compression_mode_to_string(compression_mode); + EXPECT_EQ(compression_mode_string, "NONE"); +} + +TEST(CompressionOptionsToStringTest, MessageModeReturnsMessageString) +{ + const auto compression_mode = rosbag2_cpp::CompressionMode::MESSAGE; + const auto compression_mode_string = rosbag2_cpp::compression_mode_to_string(compression_mode); + EXPECT_EQ(compression_mode_string, "MESSAGE"); +} + +TEST(CompressionOptionsToStringTest, FileModeReturnsFileString) +{ + const auto compression_mode = rosbag2_cpp::CompressionMode::FILE; + const auto compression_mode_string = rosbag2_cpp::compression_mode_to_string(compression_mode); + EXPECT_EQ(compression_mode_string, "FILE"); +} + +TEST(CompressionOptionsToStringTest, NoneModeReturnsNoneString) +{ + const auto compression_mode = rosbag2_cpp::CompressionMode::NONE; + const auto compression_mode_string = rosbag2_cpp::compression_mode_to_string(compression_mode); + EXPECT_EQ(compression_mode_string, "NONE"); +} diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp index a200d91e6..5d144ee9c 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp @@ -19,6 +19,7 @@ #include #include +#include "rosbag2_cpp/compression_options.hpp" #include "rosbag2_cpp/reader.hpp" #include "rosbag2_cpp/readers/sequential_reader.hpp" @@ -39,19 +40,16 @@ class MultifileReaderTest : public Test : storage_(std::make_shared>()), converter_factory_(std::make_shared>()), storage_serialization_format_("rmw1_format"), - relative_path_1_("some_relativ_path_1"), + relative_path_1_("some_relative_path_1"), relative_path_2_("some_relative_path_2") { - rosbag2_storage::TopicMetadata topic_with_type; - topic_with_type.name = "topic"; - topic_with_type.type = "test_msgs/BasicTypes"; - topic_with_type.serialization_format = storage_serialization_format_; + auto topic_with_type = rosbag2_storage::TopicMetadata{ + "topic", "test_msgs/BasicTypes", storage_serialization_format_}; auto topics_and_types = std::vector{topic_with_type}; auto message = std::make_shared(); message->topic_name = topic_with_type.name; - auto storage_factory = std::make_unique>(); auto metadata_io = std::make_unique>(); rosbag2_storage::BagMetadata metadata; metadata.relative_file_paths.push_back(relative_path_1_); @@ -60,6 +58,7 @@ class MultifileReaderTest : public Test ON_CALL(*metadata_io, read_metadata(_)).WillByDefault(Return(metadata)); EXPECT_CALL(*metadata_io, metadata_file_exists(_)).WillRepeatedly(Return(true)); + auto storage_factory = std::make_unique>(); EXPECT_CALL(*storage_, get_all_topics_and_types()) .Times(AtMost(1)).WillRepeatedly(Return(topics_and_types)); ON_CALL(*storage_, read_next()).WillByDefault(Return(message)); @@ -114,3 +113,95 @@ TEST_F(MultifileReaderTest, get_all_topics_and_types_throws_if_no_storage) { EXPECT_ANY_THROW(reader_->get_all_topics_and_types()); } + +class FakeSequentialReader : public rosbag2_cpp::readers::SequentialReader +{ +public: + FakeSequentialReader( + std::unique_ptr storage_factory, + std::shared_ptr converter_factory, + std::unique_ptr metadata_io) + : SequentialReader(std::move(storage_factory), + std::move(converter_factory), + std::move(metadata_io)) {} + + void decompress_message(rosbag2_storage::SerializedBagMessage *) override + { + decompress_message_call_counter++; + } + + void decompress_file(const std::string &) override + { + decompress_file_call_counter++; + } + + int decompress_message_call_counter = 0; + int decompress_file_call_counter = 0; +}; + +class ReaderCompressionTest : public Test +{ +public: + ReaderCompressionTest() + : storage_(std::make_shared>()), + storage_factory_(std::make_unique>()), + converter_factory_(std::make_shared>()), + metadata_io_(std::make_unique>()), + serialization_format_("rmw1_format") + { + topic_metadata_ = + rosbag2_storage::TopicMetadata{"test", "test_msgs/BasicTypes", serialization_format_}; + auto message = std::make_shared(); + message->topic_name = topic_metadata_.name; + ON_CALL(*storage_, read_next()).WillByDefault(Return(message)); + EXPECT_CALL(*storage_factory_, open_read_only(_, _)).WillRepeatedly(Return(storage_)); + EXPECT_CALL(*metadata_io_, metadata_file_exists(_)).WillRepeatedly(Return(true)); + } + + std::shared_ptr> storage_; + std::unique_ptr> storage_factory_; + std::shared_ptr> converter_factory_; + std::unique_ptr> metadata_io_; + rosbag2_storage::TopicMetadata topic_metadata_; + std::string serialization_format_; +}; + +TEST_F(ReaderCompressionTest, open_throws_on_bad_compression_format) { + rosbag2_storage::BagMetadata metadata; + metadata.relative_file_paths = {"some_path"}; + metadata.compression_mode = + rosbag2_cpp::compression_mode_to_string(rosbag2_cpp::CompressionMode::FILE); + metadata.compression_format = "bad_format"; + EXPECT_CALL(*metadata_io_, read_metadata(_)).WillRepeatedly(Return(metadata)); + + auto sequential_reader = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + + EXPECT_THROW( + sequential_reader->open(rosbag2_cpp::StorageOptions(), {"", serialization_format_}), + std::runtime_error); +} + +TEST_F(ReaderCompressionTest, decompress_if_metadata_has_file_compression) { + rosbag2_storage::BagMetadata metadata; + metadata.relative_file_paths = {"some_relative_path_1", "some_relative_path_2"}; + metadata.topics_with_message_count.push_back({{topic_metadata_}, 1}); + metadata.compression_mode = + rosbag2_cpp::compression_mode_to_string(rosbag2_cpp::CompressionMode::FILE); + metadata.compression_format = "zstd"; + EXPECT_CALL(*metadata_io_, read_metadata(_)).WillRepeatedly(Return(metadata)); + + auto fake_sequential_reader = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + + // storage::has_next() is called twice when reader::has_next() is called + EXPECT_CALL(*storage_, has_next()).Times(4) + .WillOnce(Return(true)).WillOnce(Return(true)) // We have a message + .WillOnce(Return(false)) // No message, load next file + .WillOnce(Return(true)); + fake_sequential_reader->open(rosbag2_cpp::StorageOptions(), {"", serialization_format_}); + fake_sequential_reader->has_next(); + fake_sequential_reader->read_next(); + fake_sequential_reader->has_next(); + EXPECT_GT(fake_sequential_reader->decompress_file_call_counter, 0); +}