diff --git a/rosbag2_compression/CMakeLists.txt b/rosbag2_compression/CMakeLists.txt index 040e7be566..be5112df32 100644 --- a/rosbag2_compression/CMakeLists.txt +++ b/rosbag2_compression/CMakeLists.txt @@ -11,6 +11,11 @@ if(NOT CMAKE_CXX_STANDARD) set(CMAKE_CXX_STANDARD 14) endif() +# Windows supplies macros for min and max by default. We should only use min and max from stl +if(WIN32) + add_definitions(-DNOMINMAX) +endif() + if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() @@ -52,7 +57,8 @@ target_compile_definitions(${PROJECT_NAME}_zstd add_library(${PROJECT_NAME} SHARED src/rosbag2_compression/compression_options.cpp - src/rosbag2_compression/sequential_compression_reader.cpp) + src/rosbag2_compression/sequential_compression_reader.cpp + src/rosbag2_compression/sequential_compression_writer.cpp) target_include_directories(${PROJECT_NAME} PUBLIC $ @@ -136,6 +142,12 @@ if(BUILD_TESTING) target_include_directories(test_sequential_compression_reader PUBLIC include) target_link_libraries(test_sequential_compression_reader ${PROJECT_NAME}) ament_target_dependencies(test_sequential_compression_reader rosbag2_cpp) + + ament_add_gmock(test_sequential_compression_writer + test/rosbag2_compression/test_sequential_compression_writer.cpp) + target_include_directories(test_sequential_compression_writer PUBLIC include) + target_link_libraries(test_sequential_compression_writer ${PROJECT_NAME}) + ament_target_dependencies(test_sequential_compression_writer rosbag2_cpp) endif() ament_package() diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp new file mode 100644 index 0000000000..322ebd74f3 --- /dev/null +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -0,0 +1,161 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_ +#define ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_ + +#include +#include +#include +#include + +#include "rosbag2_cpp/converter.hpp" +#include "rosbag2_cpp/converter_options.hpp" +#include "rosbag2_cpp/serialization_format_converter_factory.hpp" +#include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp" +#include "rosbag2_cpp/storage_options.hpp" +#include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp" + +#include "rosbag2_storage/metadata_io.hpp" +#include "rosbag2_storage/storage_factory.hpp" +#include "rosbag2_storage/storage_factory_interface.hpp" + +#include "rosbag2_compression/compression_options.hpp" + +#include "base_compressor_interface.hpp" +#include "compression_options.hpp" +#include "visibility_control.hpp" + +#ifdef _WIN32 +# pragma warning(push) +# pragma warning(disable:4251) +#endif + +namespace rosbag2_compression +{ + +class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter + : public rosbag2_cpp::writer_interfaces::BaseWriterInterface +{ +public: + explicit SequentialCompressionWriter( + const rosbag2_compression::CompressionOptions & compression_options = + rosbag2_compression::CompressionOptions()); + + SequentialCompressionWriter( + const rosbag2_compression::CompressionOptions & compression_options, + std::unique_ptr storage_factory, + std::shared_ptr converter_factory, + std::unique_ptr metadata_io); + + ~SequentialCompressionWriter() override; + + /** + * Opens a new bagfile and prepare it for writing messages. The bagfile must not exist. + * This must be called before any other function is used. + * + * \param storage_options Options to configure the storage + * \param converter_options options to define in which format incoming messages are stored + **/ + void open( + const rosbag2_cpp::StorageOptions & storage_options, + const rosbag2_cpp::ConverterOptions & converter_options) override; + + void reset() override; + + /** + * Create a new topic in the underlying storage. Needs to be called for every topic used within + * a message which is passed to write(...). + * + * \param topic_with_type name and type identifier of topic to be created + * \throws runtime_error if the Writer is not open. + */ + void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override; + + /** + * Remove a new topic in the underlying storage. + * If creation of subscription fails remove the topic + * from the db (more of cleanup) + * + * \param topic_with_type name and type identifier of topic to be created + * \throws runtime_error if the Writer is not open. + */ + void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override; + + /** + * Write a message to a bagfile. The topic needs to have been created before writing is possible. + * + * \param message to be written to the bagfile + * \throws runtime_error if the Writer is not open. + */ + void write(std::shared_ptr message) override; + +protected: + /** + * Compress the most recent file and update the metadata file path. + */ + virtual void compress_last_file(); + + /** + * Checks if the compression by message option is specified and a compressor exists. + * + * If the above conditions are satisfied, compresses the serialized bag message. + * + * \param message The message to compress. + * \return True if compression occurred, false otherwise. + */ + virtual void compress_message(std::shared_ptr message); + + /** + * Initializes the compressor if a compression mode is specified. + * + * \throws std::invalid_argument if compression_options isn't supported. + */ + virtual void setup_compression(); + +private: + std::string base_folder_; + std::unique_ptr storage_factory_{}; + std::shared_ptr converter_factory_{}; + std::shared_ptr storage_{}; + std::unique_ptr metadata_io_{}; + std::unique_ptr converter_{}; + std::unique_ptr compressor_{}; + + // Used in bagfile splitting; specifies the best-effort maximum sub-section of a bagfile in bytes. + uint64_t max_bagfile_size_{rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT}; + + // Used to track topic -> message count + std::unordered_map topics_names_to_info_{}; + + rosbag2_storage::BagMetadata metadata_{}; + + rosbag2_compression::CompressionOptions compression_options_{}; + + bool should_compress_last_file_{true}; + + // Closes the current backed storage and opens the next bagfile. + void split_bagfile(); + + // Checks if the current recording bagfile needs to be split and rolled over to a new file. + bool should_split_bagfile() const; + + // Prepares the metadata by setting initial values. + void init_metadata(); + + // Record TopicInformation into metadata + void finalize_metadata(); +}; +} // namespace rosbag2_compression +#endif // ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_ diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp new file mode 100644 index 0000000000..81197b2bee --- /dev/null +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -0,0 +1,312 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rosbag2_compression/sequential_compression_writer.hpp" + +#include +#include +#include +#include +#include +#include + +#include "rcpputils/filesystem_helper.hpp" +#include "rcutils/filesystem.h" + +#include "rosbag2_compression/zstd_compressor.hpp" +#include "rosbag2_cpp/info.hpp" +#include "rosbag2_cpp/storage_options.hpp" + +#include "logging.hpp" + +namespace rosbag2_compression +{ + +namespace +{ +std::string format_storage_uri(const std::string & base_folder, uint64_t storage_count) +{ + // Right now `base_folder_` is always just the folder name for where to install the bagfile. + // The name of the folder needs to be queried in case + // SequentialWriter is opened with a relative path. + std::stringstream storage_file_name; + storage_file_name << rcpputils::fs::path(base_folder).filename().string() << "_" << storage_count; + + return (rcpputils::fs::path(base_folder) / storage_file_name.str()).string(); +} +} // namespace + +SequentialCompressionWriter::SequentialCompressionWriter( + const rosbag2_compression::CompressionOptions & compression_options) +: storage_factory_{std::make_unique()}, + converter_factory_{std::make_shared()}, + metadata_io_{std::make_unique()}, + compression_options_{compression_options} {} + +SequentialCompressionWriter::SequentialCompressionWriter( + const rosbag2_compression::CompressionOptions & compression_options, + std::unique_ptr storage_factory, + std::shared_ptr converter_factory, + std::unique_ptr metadata_io) +: storage_factory_{std::move(storage_factory)}, + converter_factory_{std::move(converter_factory)}, + metadata_io_{std::move(metadata_io)}, + compression_options_{compression_options} {} + +SequentialCompressionWriter::~SequentialCompressionWriter() +{ + reset(); +} + +void SequentialCompressionWriter::init_metadata() +{ + metadata_ = rosbag2_storage::BagMetadata{}; + metadata_.storage_identifier = storage_->get_storage_identifier(); + metadata_.starting_time = std::chrono::time_point{ + std::chrono::nanoseconds::max()}; + metadata_.relative_file_paths = {storage_->get_relative_file_path()}; + metadata_.compression_format = compression_options_.compression_format; + metadata_.compression_mode = + rosbag2_compression::compression_mode_to_string(compression_options_.compression_mode); +} + +void SequentialCompressionWriter::setup_compression() +{ + if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::NONE) { + throw std::invalid_argument{ + "SequentialCompressionWriter requires a CompressionMode that is not NONE!"}; + } + + // TODO(zmichaels11) Support additional compression formats + if (compression_options_.compression_format == "zstd") { + compressor_ = std::make_unique(); + } else { + std::stringstream err; + err << "Unsupported compression format: \"" << compression_options_.compression_format << "\""; + throw std::invalid_argument{err.str()}; + } +} + +void SequentialCompressionWriter::open( + const rosbag2_cpp::StorageOptions & storage_options, + const rosbag2_cpp::ConverterOptions & converter_options) +{ + max_bagfile_size_ = storage_options.max_bagfile_size; + base_folder_ = storage_options.uri; + + if (converter_options.output_serialization_format != + converter_options.input_serialization_format) + { + converter_ = std::make_unique(converter_options, converter_factory_); + } + + const auto storage_uri = format_storage_uri(base_folder_, 0); + storage_ = storage_factory_->open_read_write(storage_uri, storage_options.storage_id); + if (!storage_) { + throw std::runtime_error{"No storage could be initialized. Abort"}; + } + + if (storage_options.max_bagfile_size != 0 && + storage_options.max_bagfile_size < storage_->get_minimum_split_file_size()) + { + throw std::invalid_argument{ + "Invalid bag splitting size given. Please provide a different value."}; + } + + setup_compression(); + init_metadata(); +} + +void SequentialCompressionWriter::reset() +{ + if (!base_folder_.empty()) { + if (!compressor_) { + throw std::runtime_error{"Compressor was not opened!"}; + } + + // Reset may be called before initializing the compressor (ex. bad options). + // We compress the last file only if it hasn't been compressed earlier (ex. in split_bagfile()). + if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE && + should_compress_last_file_) + { + try { + compress_last_file(); + } catch (const std::runtime_error & e) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Could not compress the last bag file.\n" << e.what()); + } + } + finalize_metadata(); + metadata_io_->write_metadata(base_folder_, metadata_); + } + + storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory + storage_factory_.reset(); +} + +void SequentialCompressionWriter::create_topic( + const rosbag2_storage::TopicMetadata & topic_with_type) +{ + if (!storage_) { + throw std::runtime_error{"Bag is not open. Call open() before writing."}; + } + + if (converter_) { + converter_->add_topic(topic_with_type.name, topic_with_type.type); + } + + if (topics_names_to_info_.find(topic_with_type.name) == + topics_names_to_info_.end()) + { + rosbag2_storage::TopicInformation info{}; + info.topic_metadata = topic_with_type; + + const auto insert_res = topics_names_to_info_.insert( + std::make_pair(topic_with_type.name, info)); + + if (!insert_res.second) { + std::stringstream errmsg; + errmsg << "Failed to insert topic \"" << topic_with_type.name << "\"!"; + throw std::runtime_error{errmsg.str()}; + } + + storage_->create_topic(topic_with_type); + } +} + +void SequentialCompressionWriter::remove_topic( + const rosbag2_storage::TopicMetadata & topic_with_type) +{ + if (!storage_) { + throw std::runtime_error{"Bag is not open. Call open() before removing."}; + } + + if (topics_names_to_info_.erase(topic_with_type.name) > 0) { + storage_->remove_topic(topic_with_type); + } else { + std::stringstream errmsg; + errmsg << "Failed to remove the non-existing topic \"" << topic_with_type.name << "\"!"; + throw std::runtime_error{errmsg.str()}; + } +} + +void SequentialCompressionWriter::compress_last_file() +{ + if (!compressor_) { + throw std::runtime_error{"Compressor was not opened!"}; + } + + const auto & to_compress = metadata_.relative_file_paths.back(); + + ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Compressing \"" << to_compress << "\""); + + metadata_.relative_file_paths.back() = compressor_->compress_uri(to_compress); +} + +void SequentialCompressionWriter::split_bagfile() +{ + if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) { + compress_last_file(); + } + + const auto storage_uri = format_storage_uri( + base_folder_, + metadata_.relative_file_paths.size()); + + storage_ = storage_factory_->open_read_write(storage_uri, metadata_.storage_identifier); + + if (!storage_) { + // Add a check to make sure reset() does not compress the file again if we couldn't load the + // storage plugin. + should_compress_last_file_ = false; + + std::stringstream errmsg; + errmsg << "Failed to rollover bagfile to new file: \"" << storage_uri << "\"!"; + throw std::runtime_error{errmsg.str()}; + } + + metadata_.relative_file_paths.push_back(storage_->get_relative_file_path()); + + // Re-register all topics since we rolled-over to a new bagfile. + for (const auto & topic : topics_names_to_info_) { + storage_->create_topic(topic.second.topic_metadata); + } +} + +void SequentialCompressionWriter::compress_message( + std::shared_ptr message) +{ + if (!compressor_) { + throw std::runtime_error{"Cannot compress message; Writer is not open!"}; + } + + compressor_->compress_serialized_bag_message(message.get()); +} + +void SequentialCompressionWriter::write( + std::shared_ptr message) +{ + if (!storage_) { + throw std::runtime_error{"Bag is not open. Call open() before writing."}; + } + + // Update the message count for the Topic. + ++topics_names_to_info_.at(message->topic_name).message_count; + + if (should_split_bagfile()) { + split_bagfile(); + } + + const auto message_timestamp = std::chrono::time_point{ + std::chrono::nanoseconds(message->time_stamp)}; + metadata_.starting_time = std::min(metadata_.starting_time, message_timestamp); + + const auto duration = message_timestamp - metadata_.starting_time; + metadata_.duration = std::max(metadata_.duration, duration); + + auto converted_message = converter_ ? converter_->convert(message) : message; + if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::MESSAGE) { + compress_message(converted_message); + } + + storage_->write(converted_message); +} + +bool SequentialCompressionWriter::should_split_bagfile() const +{ + if (max_bagfile_size_ == rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT) { + return false; + } else { + return storage_->get_bagfile_size() > max_bagfile_size_; + } +} + +void SequentialCompressionWriter::finalize_metadata() +{ + metadata_.bag_size = 0; + + for (const auto & path : metadata_.relative_file_paths) { + metadata_.bag_size += rcutils_get_file_size(path.c_str()); + } + + metadata_.topics_with_message_count.clear(); + metadata_.topics_with_message_count.reserve(topics_names_to_info_.size()); + metadata_.message_count = 0; + + for (const auto & topic : topics_names_to_info_) { + metadata_.topics_with_message_count.push_back(topic.second); + metadata_.message_count += topic.second.message_count; + } +} + +} // namespace rosbag2_compression diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp new file mode 100644 index 0000000000..103ce8de86 --- /dev/null +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -0,0 +1,84 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include + +#include "rosbag2_compression/compression_options.hpp" +#include "rosbag2_compression/sequential_compression_writer.hpp" + +#include "rosbag2_cpp/writer.hpp" + +#include "rosbag2_storage/bag_metadata.hpp" +#include "rosbag2_storage/topic_metadata.hpp" + +#include "../../rosbag2_cpp/test/rosbag2_cpp/mock_converter_factory.hpp" +#include "../../rosbag2_cpp/test/rosbag2_cpp/mock_metadata_io.hpp" +#include "../../rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp" +#include "../../rosbag2_cpp/test/rosbag2_cpp/mock_storage_factory.hpp" + +using namespace testing; // NOLINT + +class SequentialCompressionWriterTest : public Test +{ +public: + SequentialCompressionWriterTest() + : storage_factory_{std::make_unique>()}, + storage_{std::make_shared>()}, + converter_factory_{std::make_shared>()}, + metadata_io_{std::make_unique>()}, + storage_options_{}, + serialization_format_{"rmw_format"} + { + ON_CALL(*storage_factory_, open_read_write(_, _)).WillByDefault(Return(storage_)); + EXPECT_CALL(*storage_factory_, open_read_write(_, _)).Times(AtLeast(0)); + } + + std::unique_ptr> storage_factory_; + std::shared_ptr> storage_; + std::shared_ptr> converter_factory_; + std::unique_ptr metadata_io_; + std::unique_ptr writer_; + rosbag2_cpp::StorageOptions storage_options_; + std::string serialization_format_; +}; + + +TEST_F(SequentialCompressionWriterTest, open_throws_on_bad_compression_format) +{ + rosbag2_compression::CompressionOptions compression_options{ + "bad_format", rosbag2_compression::CompressionMode::FILE}; + auto sequential_writer = std::make_unique( + compression_options, std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + EXPECT_THROW( + writer_->open(rosbag2_cpp::StorageOptions(), {serialization_format_, serialization_format_}), + std::invalid_argument); +} + +TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_format) +{ + rosbag2_compression::CompressionOptions compression_options{ + "zstd", rosbag2_compression::CompressionMode::FILE}; + auto sequential_writer = std::make_unique( + compression_options, std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + EXPECT_NO_THROW( + writer_->open(rosbag2_cpp::StorageOptions(), {serialization_format_, serialization_format_})); +} diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp index 37958930f3..5373e6443a 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp @@ -17,6 +17,8 @@ #include +#include "rosbag2_cpp/converter_options.hpp" +#include "rosbag2_cpp/storage_options.hpp" #include "rosbag2_cpp/visibility_control.hpp" #include "rosbag2_storage/serialized_bag_message.hpp"