Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rosbag2_compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ install(

ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME}_zstd)
ament_export_dependencies(rosbag2_storage rcutils zstd_vendor zstd)
ament_export_dependencies(rosbag2_storage rcutils zstd_vendor)

if(BUILD_TESTING)
find_package(ament_cmake_gmock REQUIRED)
Expand Down
13 changes: 12 additions & 1 deletion rosbag2_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ find_package(poco_vendor)
find_package(Poco COMPONENTS Foundation)
find_package(rcpputils REQUIRED)
find_package(rcutils REQUIRED)
find_package(rosbag2_compression REQUIRED)
find_package(rosbag2_storage REQUIRED)
find_package(rosidl_generator_cpp REQUIRED)
find_package(rosidl_typesupport_cpp REQUIRED)
find_package(rosidl_typesupport_introspection_cpp REQUIRED)

add_library(${PROJECT_NAME} SHARED
src/rosbag2_cpp/compression_options.cpp
src/rosbag2_cpp/converter.cpp
src/rosbag2_cpp/info.cpp
src/rosbag2_cpp/reader.cpp
Expand All @@ -57,6 +59,7 @@ ament_target_dependencies(${PROJECT_NAME}
Poco
rcpputils
rcutils
rosbag2_compression
rosbag2_storage
rosidl_generator_cpp
rosidl_typesupport_introspection_cpp
Expand Down Expand Up @@ -88,7 +91,7 @@ install(

ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME})
ament_export_dependencies(pluginlib rosbag2_storage rosidl_typesupport_introspection_cpp)
ament_export_dependencies(pluginlib rosbag2_compression rosbag2_storage rosidl_typesupport_introspection_cpp)

if(BUILD_TESTING)
find_package(ament_cmake_gmock REQUIRED)
Expand All @@ -109,6 +112,13 @@ if(BUILD_TESTING)
RUNTIME DESTINATION bin)
pluginlib_export_plugin_description_file(rosbag2_cpp test/rosbag2_cpp/converter_test_plugin.xml)

ament_add_gmock(test_compression_options
test/rosbag2_cpp/test_compression_options.cpp)
if(TARGET test_compression_options)
target_include_directories(test_compression_options PRIVATE include)
target_link_libraries(test_compression_options ${PROJECT_NAME})
endif()

ament_add_gmock(test_converter_factory
test/rosbag2_cpp/test_converter_factory.cpp)
if(TARGET test_converter_factory)
Expand Down Expand Up @@ -156,6 +166,7 @@ if(BUILD_TESTING)
ament_index_cpp
Poco
rcutils
rosbag2_compression
rosbag2_storage
rosidl_generator_cpp
rosidl_typesupport_introspection_cpp
Expand Down
51 changes: 51 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/compression_options.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_
#define ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_

#include <string>

namespace rosbag2_cpp
{

/**
* Modes are used to specify whether to compress by individual serialized bag messages or by file.
* rosbag2_cpp defaults to NONE.
*/
enum class CompressionMode
Comment thread
piraka9011 marked this conversation as resolved.
Outdated
{
NONE,
FILE,
MESSAGE,
};
Comment thread
piraka9011 marked this conversation as resolved.
Outdated

/**
* Converts a string into a rosbag2_cpp::CompressionMode enum.
*
* \param compression_mode A case insensitive string that is either "FILE" or "MESSAGE".
* \return CompressionMode NONE if compression_mode is invalid. FILE or MESSAGE otherwise.
*/
CompressionMode compression_mode_from_string(const std::string & compression_mode);

/**
* Converts a rosbag2_cpp::CompressionMode enum into a string.
*
* \param compression_mode A CompressionMode enum.
* \return The corresponding mode as a string.
*/
std::string compression_mode_to_string(CompressionMode compression_mode);

} // namespace rosbag2_cpp
#endif // ROSBAG2_CPP__COMPRESSION_OPTIONS_HPP_
33 changes: 33 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
#include <string>
#include <vector>

#include "rosbag2_cpp/compression_options.hpp"
#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

#include "rosbag2_compression/base_decompressor_interface.hpp"
#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#include "rosbag2_storage/storage_factory_interface.hpp"
Expand Down Expand Up @@ -121,14 +123,45 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
const std::string & converter_serialization_format,
const std::string & storage_serialization_format);

/**
* Opens a storage plugin for read only.
*
* \throws runtime_error If no storage could be initialized.
Comment thread
piraka9011 marked this conversation as resolved.
Outdated
*/
virtual void open_storage();

/**
* Initializes the decompressor if a compression mode is specified in the metadata.
*
* \throws runtime_error If compression format doesn't exist.
*/
virtual void setup_compression();

std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_{};
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_{};
std::shared_ptr<rosbag2_storage::storage_interfaces::ReadOnlyInterface> storage_{};
std::unique_ptr<Converter> converter_{};
std::unique_ptr<rosbag2_compression::BaseDecompressorInterface> decompressor_{};
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io_{};
rosbag2_storage::BagMetadata metadata_{};
std::vector<std::string> file_paths_{}; // List of database files.
std::vector<std::string>::iterator current_file_iterator_{}; // Index of file to read from
rosbag2_cpp::CompressionMode compression_mode_{rosbag2_cpp::CompressionMode::NONE};

protected:
/**
* Checks if the compression mode is of type MESSAGE and if so, decompresses the message.
*
* \param message A serialized bag message
*/
virtual void decompress_message(rosbag2_storage::SerializedBagMessage * message);

/**
* Checks if the compression mode is of type FILE and if so, decompresses the file.
*
* \param uri Relative path as a string
*/
virtual void decompress_file(const std::string & uri);
};

} // namespace readers
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<depend>pluginlib</depend>
<depend>poco_vendor</depend>
<depend>rcutils</depend>
<depend>rosbag2_compression</depend>
<depend>rosbag2_storage</depend>
<depend>rosidl_generator_cpp</depend>
<depend>rosidl_typesupport_cpp</depend>
Expand Down
69 changes: 69 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/compression_options.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <string>

#include "rosbag2_cpp/compression_options.hpp"
#include "rosbag2_cpp/logging.hpp"

namespace rosbag2_cpp
{

namespace
{

constexpr const char kCompressionModeNoneStr[] = "NONE";
constexpr const char kCompressionModeFileStr[] = "FILE";
constexpr const char kCompressionModeMessageStr[] = "MESSAGE";

std::string to_upper(const std::string & text)
{
std::string uppercase_text = text;
std::transform(uppercase_text.begin(), uppercase_text.end(), uppercase_text.begin(), ::toupper);
return uppercase_text;
}
} // namespace

CompressionMode compression_mode_from_string(const std::string & compression_mode)
{
const auto compression_mode_upper = to_upper(compression_mode);
if (compression_mode.empty() || compression_mode_upper == kCompressionModeNoneStr) {
return CompressionMode::NONE;
} else if (compression_mode_upper == kCompressionModeFileStr) {
return CompressionMode::FILE;
} else if (compression_mode_upper == kCompressionModeMessageStr) {
return CompressionMode::MESSAGE;
} else {
ROSBAG2_CPP_LOG_ERROR_STREAM(
"CompressionMode: \"" << compression_mode << "\" is not supported!");
return CompressionMode::NONE;
}
}

std::string compression_mode_to_string(const CompressionMode compression_mode)
{
switch (compression_mode) {
case CompressionMode::NONE:
return kCompressionModeNoneStr;
case CompressionMode::FILE:
return kCompressionModeFileStr;
case CompressionMode::MESSAGE:
return kCompressionModeMessageStr;
default:
ROSBAG2_CPP_LOG_ERROR_STREAM("CompressionMode not supported!");
return kCompressionModeNoneStr;
}
}
} // namespace rosbag2_cpp
58 changes: 46 additions & 12 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <vector>

#include "rcpputils/filesystem_helper.hpp"
#include "rosbag2_compression/zstd_decompressor.hpp"

#include "rosbag2_cpp/logging.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"
Expand Down Expand Up @@ -48,6 +49,30 @@ void SequentialReader::reset()
storage_.reset();
}

void SequentialReader::open_storage()
{
storage_ = storage_factory_->open_read_only(
*current_file_iterator_, metadata_.storage_identifier);
if (!storage_) {
throw std::runtime_error{"No storage could be initialized. Abort"};
}
}

void SequentialReader::setup_compression()
Comment thread
piraka9011 marked this conversation as resolved.
{
compression_mode_ = compression_mode_from_string(metadata_.compression_mode);
if (compression_mode_ != rosbag2_cpp::CompressionMode::NONE) {
// TODO(piraka9011): Replace this with a compressor_factory.
if (metadata_.compression_format == "zstd") {
decompressor_ = std::make_unique<rosbag2_compression::ZstdDecompressor>();
} else {
std::stringstream err;
err << "Unsupported compression format " << metadata_.compression_format;
throw std::runtime_error(err.str());
}
}
}

void SequentialReader::open(
const StorageOptions & storage_options, const ConverterOptions & converter_options)
{
Expand All @@ -60,21 +85,12 @@ void SequentialReader::open(
ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata.");
return;
}

open_storage();
setup_compression();
file_paths_ = metadata_.relative_file_paths;
current_file_iterator_ = file_paths_.begin();

storage_ = storage_factory_->open_read_only(
*current_file_iterator_, metadata_.storage_identifier);
if (!storage_) {
throw std::runtime_error("No storage could be initialized. Abort");
}
} else {
storage_ = storage_factory_->open_read_only(
storage_options.uri, storage_options.storage_id);
if (!storage_) {
throw std::runtime_error("No storage could be initialized. Abort");
}
open_storage();
metadata_ = storage_->get_metadata();
if (metadata_.relative_file_paths.empty()) {
ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata.");
Expand Down Expand Up @@ -112,10 +128,19 @@ bool SequentialReader::has_next()
throw std::runtime_error("Bag is not open. Call open() before reading.");
}

void SequentialReader::decompress_message(
rosbag2_storage::SerializedBagMessage * message)
{
if (compression_mode_ == CompressionMode::MESSAGE) {
decompressor_->decompress_serialized_bag_message(message);
}
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialReader::read_next()
{
if (storage_) {
auto message = storage_->read_next();
Comment thread
piraka9011 marked this conversation as resolved.
decompress_message(message.get());
return converter_ ? converter_->convert(message) : message;
}
throw std::runtime_error("Bag is not open. Call open() before reading.");
Expand All @@ -134,10 +159,19 @@ bool SequentialReader::has_next_file() const
return current_file_iterator_ + 1 != file_paths_.end();
}

void SequentialReader::decompress_file(const std::string & uri)
{
if (compression_mode_ == CompressionMode::FILE) {
ROSBAG2_CPP_LOG_DEBUG_STREAM("Decompressing " << uri);
*current_file_iterator_ = decompressor_->decompress_uri(uri);
}
}

void SequentialReader::load_next_file()
{
assert(current_file_iterator_ != file_paths_.end());
current_file_iterator_++;
decompress_file(*current_file_iterator_);
}

std::string SequentialReader::get_current_file() const
Expand Down
Loading