Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rosbag2_compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ install(

ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME}_zstd)
ament_export_dependencies(rosbag2_storage rcutils zstd_vendor zstd)
ament_export_dependencies(rosbag2_storage rcutils zstd_vendor)

if(BUILD_TESTING)
find_package(ament_cmake_gmock REQUIRED)
Expand Down
13 changes: 12 additions & 1 deletion rosbag2_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ find_package(poco_vendor)
find_package(Poco COMPONENTS Foundation)
find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)
find_package(rosbag2_compression REQUIRED)
find_package(rosbag2_storage REQUIRED)
find_package(rosidl_generator_cpp REQUIRED)
find_package(rosidl_typesupport_cpp REQUIRED)
find_package(rosidl_typesupport_introspection_cpp REQUIRED)

add_library(${PROJECT_NAME} SHARED
src/rosbag2_cpp/compression_options.cpp
src/rosbag2_cpp/converter.cpp
src/rosbag2_cpp/info.cpp
src/rosbag2_cpp/reader.cpp
Expand All @@ -57,6 +59,7 @@ ament_target_dependencies(${PROJECT_NAME}
Poco
rcpputils
rcutils
rosbag2_compression
rosbag2_storage
rosidl_generator_cpp
rosidl_typesupport_introspection_cpp
Expand Down Expand Up @@ -88,7 +91,7 @@ install(

ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME})
ament_export_dependencies(pluginlib rosbag2_storage rosidl_typesupport_introspection_cpp)
ament_export_dependencies(pluginlib rosbag2_compression rosbag2_storage rosidl_typesupport_introspection_cpp)

if(BUILD_TESTING)
find_package(ament_cmake_gmock REQUIRED)
Expand All @@ -109,6 +112,13 @@ if(BUILD_TESTING)
RUNTIME DESTINATION bin)
pluginlib_export_plugin_description_file(rosbag2_cpp test/rosbag2_cpp/converter_test_plugin.xml)

ament_add_gmock(test_compression_options
test/rosbag2_cpp/test_compression_options.cpp)
if(TARGET test_compression_options)
target_include_directories(test_compression_options PRIVATE include)
target_link_libraries(test_compression_options ${PROJECT_NAME})
endif()

ament_add_gmock(test_converter_factory
test/rosbag2_cpp/test_converter_factory.cpp)
if(TARGET test_converter_factory)
Expand Down Expand Up @@ -156,6 +166,7 @@ if(BUILD_TESTING)
ament_index_cpp
Poco
rcutils
rosbag2_compression
rosbag2_storage
rosidl_generator_cpp
rosidl_typesupport_introspection_cpp
Expand Down
54 changes: 54 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/compression_options.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_
#define ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_

#include <string>

#include "visibility_control.hpp"

namespace rosbag2_cpp
{

/**
* Modes are used to specify whether to compress by individual serialized bag messages or by file.
* rosbag2_cpp defaults to NONE.
*/
enum class ROSBAG2_CPP_PUBLIC CompressionMode : uint32_t
{
NONE = 0,
FILE,
MESSAGE,
LAST_MODE = MESSAGE
};

/**
* Converts a string into a rosbag2_cpp::CompressionMode enum.
*
* \param compression_mode A case insensitive string that is either "FILE" or "MESSAGE".
* \return CompressionMode NONE if compression_mode is invalid. FILE or MESSAGE otherwise.
*/
ROSBAG2_CPP_PUBLIC CompressionMode compression_mode_from_string(const std::string & compression_mode);

/**
* Converts a rosbag2_cpp::CompressionMode enum into a string.
*
* \param compression_mode A CompressionMode enum.
* \return The corresponding mode as a string.
*/
ROSBAG2_CPP_PUBLIC std::string compression_mode_to_string(CompressionMode compression_mode);

} // namespace rosbag2_cpp
#endif // ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_
33 changes: 33 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
#include <string>
#include <vector>

#include "rosbag2_cpp/compression_options.hpp"
#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

#include "rosbag2_compression/base_decompressor_interface.hpp"
#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#include "rosbag2_storage/storage_factory_interface.hpp"
Expand Down Expand Up @@ -121,14 +123,45 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
const std::string & converter_serialization_format,
const std::string & storage_serialization_format);

/**
* Opens a storage plugin for read only.
*
* \throws std::runtime_error If no storage could be initialized.
*/
virtual void open_storage();

/**
* Initializes the decompressor if a compression mode is specified in the metadata.
*
* \throws std::runtime_error If compression format doesn't exist.
*/
virtual void setup_compression();

std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_{};
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_{};
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInterface> storage_{};
std::unique_ptr<Converter> converter_{};
std::unique_ptr<rosbag2_compression::BaseDecompressorInterface> decompressor_{};
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_{};
rosbag2_storage::BagMetadata metadata_{};
std::vector<std::string> file_paths_{}; // List of database files.
std::vector<std::string>::iterator current_file_iterator_{}; // Index of file to read from
rosbag2_cpp::CompressionMode compression_mode_{rosbag2_cpp::CompressionMode::NONE};

protected:
/**
* Checks if the compression mode is of type MESSAGE and if so, decompresses the message.
*
* \param message A serialized bag message
*/
virtual void decompress_message(rosbag2_storage::SerializedBagMessage * message);

/**
* Checks if the compression mode is of type FILE and if so, decompresses the file.
*
* \param uri Relative path as a string
*/
virtual void decompress_file(const std::string & uri);
};

} // namespace readers
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<depend>pluginlib</depend>
<depend>poco_vendor</depend>
<depend>rcutils</depend>
<depend>rosbag2_compression</depend>
<depend>rosbag2_storage</depend>
<depend>rosidl_generator_cpp</depend>
<depend>rosidl_typesupport_cpp</depend>
Expand Down
69 changes: 69 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/compression_options.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <string>

#include "rosbag2_cpp/compression_options.hpp"
#include "rosbag2_cpp/logging.hpp"

namespace rosbag2_cpp
{

namespace
{

constexpr const char kCompressionModeNoneStr[] = "NONE";
constexpr const char kCompressionModeFileStr[] = "FILE";
constexpr const char kCompressionModeMessageStr[] = "MESSAGE";

std::string to_upper(const std::string & text)
{
std::string uppercase_text = text;
std::transform(uppercase_text.begin(), uppercase_text.end(), uppercase_text.begin(), ::toupper);
return uppercase_text;
}
} // namespace

CompressionMode compression_mode_from_string(const std::string & compression_mode)
{
const auto compression_mode_upper = to_upper(compression_mode);
if (compression_mode.empty() || compression_mode_upper == kCompressionModeNoneStr) {
return CompressionMode::NONE;
} else if (compression_mode_upper == kCompressionModeFileStr) {
return CompressionMode::FILE;
} else if (compression_mode_upper == kCompressionModeMessageStr) {
return CompressionMode::MESSAGE;
} else {
ROSBAG2_CPP_LOG_ERROR_STREAM(
"CompressionMode: \"" << compression_mode << "\" is not supported!");
return CompressionMode::NONE;
}
}

std::string compression_mode_to_string(const CompressionMode compression_mode)
{
switch (compression_mode) {
case CompressionMode::NONE:
return kCompressionModeNoneStr;
case CompressionMode::FILE:
return kCompressionModeFileStr;
case CompressionMode::MESSAGE:
return kCompressionModeMessageStr;
default:
ROSBAG2_CPP_LOG_ERROR_STREAM("CompressionMode not supported!");
return kCompressionModeNoneStr;
}
}
} // namespace rosbag2_cpp
58 changes: 46 additions & 12 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <vector>

#include "rcpputils/filesystem_helper.hpp"
#include "rosbag2_compression/zstd_decompressor.hpp"

#include "rosbag2_cpp/logging.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"
Expand Down Expand Up @@ -48,6 +49,30 @@ void SequentialReader::reset()
storage_.reset();
}

void SequentialReader::open_storage()
{
storage_ = storage_factory_->open_read_only(
*current_file_iterator_, metadata_.storage_identifier);
if (!storage_) {
throw std::runtime_error{"No storage could be initialized. Abort"};
}
}

void SequentialReader::setup_compression()
{
compression_mode_ = compression_mode_from_string(metadata_.compression_mode);
if (compression_mode_ != rosbag2_cpp::CompressionMode::NONE) {
// TODO(piraka9011): Replace this with a compressor_factory.
if (metadata_.compression_format == "zstd") {
decompressor_ = std::make_unique<rosbag2_compression::ZstdDecompressor>();
} else {
std::stringstream err;
err << "Unsupported compression format " << metadata_.compression_format;
throw std::runtime_error(err.str());
}
}
}

void SequentialReader::open(
const StorageOptions & storage_options, const ConverterOptions & converter_options)
{
Expand All @@ -60,21 +85,12 @@ void SequentialReader::open(
ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata.");
return;
}

open_storage();
setup_compression();
file_paths_ = metadata_.relative_file_paths;
current_file_iterator_ = file_paths_.begin();

storage_ = storage_factory_->open_read_only(
*current_file_iterator_, metadata_.storage_identifier);
if (!storage_) {
throw std::runtime_error("No storage could be initialized. Abort");
}
} else {
storage_ = storage_factory_->open_read_only(
storage_options.uri, storage_options.storage_id);
if (!storage_) {
throw std::runtime_error("No storage could be initialized. Abort");
}
open_storage();
metadata_ = storage_->get_metadata();
if (metadata_.relative_file_paths.empty()) {
ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata.");
Expand Down Expand Up @@ -112,10 +128,19 @@ bool SequentialReader::has_next()
throw std::runtime_error("Bag is not open. Call open() before reading.");
}

void SequentialReader::decompress_message(
rosbag2_storage::SerializedBagMessage * message)
{
if (compression_mode_ == CompressionMode::MESSAGE) {
decompressor_->decompress_serialized_bag_message(message);
}
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialReader::read_next()
{
if (storage_) {
auto message = storage_->read_next();
decompress_message(message.get());
return converter_ ? converter_->convert(message) : message;
}
throw std::runtime_error("Bag is not open. Call open() before reading.");
Expand All @@ -134,10 +159,19 @@ bool SequentialReader::has_next_file() const
return current_file_iterator_ + 1 != file_paths_.end();
}

void SequentialReader::decompress_file(const std::string & uri)
{
if (compression_mode_ == CompressionMode::FILE) {
ROSBAG2_CPP_LOG_DEBUG_STREAM("Decompressing " << uri);
*current_file_iterator_ = decompressor_->decompress_uri(uri);
}
}

void SequentialReader::load_next_file()
{
assert(current_file_iterator_ != file_paths_.end());
current_file_iterator_++;
decompress_file(*current_file_iterator_);
}

std::string SequentialReader::get_current_file() const
Expand Down
Loading