From 16d8164c82704f48745079a5006035a4a6490c79 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Mon, 6 Sep 2021 11:32:29 +0800 Subject: [PATCH 1/9] Fix a bug on invalid pointer address when using "MESSAGE" compression mode to write data Signed-off-by: Barry Xu --- rosbag2_cpp/include/rosbag2_cpp/writer.hpp | 2 +- rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 40 +++++++++++++++++++--- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index 5f2d0796ed..e41629a2bd 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -144,7 +144,7 @@ class ROSBAG2_CPP_PUBLIC Writer final * \param topic_name the string of the topic this messages belongs to * \param type_name the string of the type associated with this message * \param time The time stamp of the message - * \throws runtime_error if the Writer is not open. + * \throws runtime_error if the Writer is not open or duplicating message is failed. */ void write( const rclcpp::SerializedMessage & message, diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 77632a3977..3db6bff689 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -16,11 +16,13 @@ #include #include +#include #include #include #include #include +#include "rclcpp/logging.hpp" #include "rclcpp/serialized_message.hpp" #include "rclcpp/time.hpp" @@ -117,11 +119,41 @@ void Writer::write( serialized_bag_message->topic_name = topic_name; serialized_bag_message->time_stamp = time.nanoseconds(); - // temporary store the payload in a shared_ptr. - // add custom no-op deleter to avoid deep copying data. serialized_bag_message->serialized_data = std::shared_ptr( - const_cast(&message.get_rcl_serialized_message()), - [](rcutils_uint8_array_t * /* data */) {}); + new rcutils_uint8_array_t, + [](rcutils_uint8_array_t * msg) { + auto fini_return = rcutils_uint8_array_fini(msg); + delete msg; + if (fini_return != RCUTILS_RET_OK) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rosbag2_cpp"), + "Failed to destroy serialized message: " << rcutils_get_error_string().str); + } + }); + + // While using message compression mode, another thread deals with this serialized message + // asynchronously. + // In order to keep serialized message valid, have to duplicate message. + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + rcutils_ret_t ret = rcutils_uint8_array_init( + serialized_bag_message->serialized_data.get(), + message.get_rcl_serialized_message().buffer_capacity, + &allocator); + if (ret != RCUTILS_RET_OK) { + auto err = std::string("Failed to call rcutils_uint8_array_init(): return "); + err += ret; + throw std::runtime_error(err); + } + + std::memcpy( + serialized_bag_message->serialized_data.get()->buffer, + message.get_rcl_serialized_message().buffer, + message.get_rcl_serialized_message().buffer_length); + + serialized_bag_message->serialized_data.get()->buffer_length = + message.get_rcl_serialized_message().buffer_length; return write( serialized_bag_message, topic_name, type_name, rmw_get_serialization_format()); From 7ad085411fd8fecf5c224cb3f30d82c9e127f893 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Mon, 6 Sep 2021 16:08:49 +0800 Subject: [PATCH 2/9] Change code comments Signed-off-by: Barry Xu --- rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 3db6bff689..6a387fcdc4 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -131,8 +131,8 @@ void Writer::write( } }); - // While using message compression mode, another thread deals with this serialized message - // asynchronously. + // While using compression mode and cache size isn't 0, another thread deals with this serialized + // message asynchronously. // In order to keep serialized message valid, have to duplicate message. rcutils_allocator_t allocator = rcutils_get_default_allocator(); From 83f9d9d30ab81ec869068a96aece78e507851a4d Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 7 Sep 2021 10:48:26 +0800 Subject: [PATCH 3/9] Add new interface to check if processing message is async Signed-off-by: Barry Xu --- .../sequential_compression_writer.hpp | 7 ++ .../sequential_compression_writer.cpp | 9 +++ .../base_writer_interface.hpp | 2 + .../rosbag2_cpp/writers/sequential_writer.hpp | 7 ++ rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 71 ++++++++++--------- .../rosbag2_cpp/writers/sequential_writer.cpp | 9 +++ .../mock_sequential_writer.hpp | 5 ++ 7 files changed, 76 insertions(+), 34 deletions(-) diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp index c2cbceeb55..3203b5a045 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -119,6 +119,13 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter */ void close() override; + /** + * Get the information on whether message processing is asynchronous + * + * \return true for asynchronous processing. Otherwise, synchronous processing + */ + bool is_async_process_message() override; + protected: /** * Compress a file and update the metadata file path. diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 8d1ec6872a..2db7148aee 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -216,6 +216,15 @@ void SequentialCompressionWriter::close() storage_factory_.reset(); } +bool SequentialCompressionWriter::is_async_process_message() +{ + if (compression_options_.compression_mode == CompressionMode::MESSAGE) { + return true; + } else { + return false; + } +} + void SequentialCompressionWriter::create_topic( const rosbag2_storage::TopicMetadata & topic_with_type) { diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp index 396b6a4ae2..54522f2c00 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp @@ -51,6 +51,8 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface * \returns true if snapshot is successful, false if snapshot fails or is not supported */ virtual bool take_snapshot() = 0; + + virtual bool is_async_process_message() = 0; }; } // namespace writer_interfaces diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 4bc14b47bb..0fdf83ea42 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -114,6 +114,13 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter */ bool take_snapshot() override; + /** + * Get the information on whether message processing is asynchronous + * + * \return true for asynchronous processing. Otherwise, synchronous processing + */ + bool is_async_process_message() override; + protected: std::string base_folder_; std::unique_ptr storage_factory_; diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 6a387fcdc4..09882c54df 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -119,42 +119,45 @@ void Writer::write( serialized_bag_message->topic_name = topic_name; serialized_bag_message->time_stamp = time.nanoseconds(); - serialized_bag_message->serialized_data = std::shared_ptr( - new rcutils_uint8_array_t, - [](rcutils_uint8_array_t * msg) { - auto fini_return = rcutils_uint8_array_fini(msg); - delete msg; - if (fini_return != RCUTILS_RET_OK) { - RCLCPP_ERROR_STREAM( - rclcpp::get_logger("rosbag2_cpp"), - "Failed to destroy serialized message: " << rcutils_get_error_string().str); - } - }); - - // While using compression mode and cache size isn't 0, another thread deals with this serialized - // message asynchronously. - // In order to keep serialized message valid, have to duplicate message. - - rcutils_allocator_t allocator = rcutils_get_default_allocator(); - - rcutils_ret_t ret = rcutils_uint8_array_init( - serialized_bag_message->serialized_data.get(), - message.get_rcl_serialized_message().buffer_capacity, - &allocator); - if (ret != RCUTILS_RET_OK) { - auto err = std::string("Failed to call rcutils_uint8_array_init(): return "); - err += ret; - throw std::runtime_error(err); + if (writer_impl_->is_async_process_message()) { + // For asynchronous processing message, duplicate message + serialized_bag_message->serialized_data = std::shared_ptr( + new rcutils_uint8_array_t, + [](rcutils_uint8_array_t * msg) { + auto fini_return = rcutils_uint8_array_fini(msg); + delete msg; + if (fini_return != RCUTILS_RET_OK) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rosbag2_cpp"), + "Failed to destroy serialized message: " << rcutils_get_error_string().str); + } + }); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + rcutils_ret_t ret = rcutils_uint8_array_init( + serialized_bag_message->serialized_data.get(), + message.get_rcl_serialized_message().buffer_capacity, + &allocator); + if (ret != RCUTILS_RET_OK) { + auto err = std::string("Failed to call rcutils_uint8_array_init(): return "); + err += ret; + throw std::runtime_error(err); + } + + std::memcpy( + serialized_bag_message->serialized_data.get()->buffer, + message.get_rcl_serialized_message().buffer, + message.get_rcl_serialized_message().buffer_length); + + serialized_bag_message->serialized_data.get()->buffer_length = + message.get_rcl_serialized_message().buffer_length; + } else { + serialized_bag_message->serialized_data = std::shared_ptr( + const_cast(&message.get_rcl_serialized_message()), + [](rcutils_uint8_array_t * /* data */) {}); } - std::memcpy( - serialized_bag_message->serialized_data.get()->buffer, - message.get_rcl_serialized_message().buffer, - message.get_rcl_serialized_message().buffer_length); - - serialized_bag_message->serialized_data.get()->buffer_length = - message.get_rcl_serialized_message().buffer_length; - return write( serialized_bag_message, topic_name, type_name, rmw_get_serialization_format()); } diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 2273a29e25..3b67da6662 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -313,6 +313,15 @@ bool SequentialWriter::take_snapshot() return true; } +bool SequentialWriter::is_async_process_message() +{ + if (storage_options_.max_cache_size != 0u) { + return true; + } else { + return false; + } +} + std::shared_ptr SequentialWriter::get_writeable_message( std::shared_ptr message) diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp index 0845e5d3f8..d81392f437 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp @@ -62,6 +62,11 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn std::swap(snapshot_buffer_, messages_); snapshot_buffer_.clear(); return true; + } + + bool is_async_process_message() override + { + return false; } const std::vector> & get_messages() From 7f3e485d85649023a3ef12b44698737cb99e2d89 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 7 Sep 2021 11:22:58 +0800 Subject: [PATCH 4/9] If enable convert mode, need not duplicate input message Signed-off-by: Barry Xu --- rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 3b67da6662..78d685e08d 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -315,7 +315,7 @@ bool SequentialWriter::take_snapshot() bool SequentialWriter::is_async_process_message() { - if (storage_options_.max_cache_size != 0u) { + if (converter_ == nullptr && storage_options_.max_cache_size != 0u) { return true; } else { return false; From 3c996de77a6b39024e6c41905c8a8250cc7f406a Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 7 Sep 2021 13:59:47 +0800 Subject: [PATCH 5/9] Update interface name Signed-off-by: Barry Xu --- .../rosbag2_compression/sequential_compression_writer.hpp | 6 +++--- .../rosbag2_compression/sequential_compression_writer.cpp | 2 +- .../rosbag2_cpp/writer_interfaces/base_writer_interface.hpp | 2 +- .../include/rosbag2_cpp/writers/sequential_writer.hpp | 6 +++--- rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 2 +- rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp | 2 +- .../test/rosbag2_transport/mock_sequential_writer.hpp | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp index 3203b5a045..41925b4c13 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -120,11 +120,11 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter void close() override; /** - * Get the information on whether message processing is asynchronous + * Check if write processing needs to have the ownership of serialized message * - * \return true for asynchronous processing. Otherwise, synchronous processing + * \return true if writer processing needs the ownership. Otherwise, return false. */ - bool is_async_process_message() override; + bool request_owned_serialized_data() override; protected: /** diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 2db7148aee..34982f1790 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -216,7 +216,7 @@ void SequentialCompressionWriter::close() storage_factory_.reset(); } -bool SequentialCompressionWriter::is_async_process_message() +bool SequentialCompressionWriter::request_owned_serialized_data() { if (compression_options_.compression_mode == CompressionMode::MESSAGE) { return true; diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp index 54522f2c00..0963e3d8ac 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp @@ -52,7 +52,7 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface */ virtual bool take_snapshot() = 0; - virtual bool is_async_process_message() = 0; + virtual bool request_owned_serialized_data() = 0; }; } // namespace writer_interfaces diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 0fdf83ea42..157fcc3792 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -115,11 +115,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter bool take_snapshot() override; /** - * Get the information on whether message processing is asynchronous + * Check if write processing needs to have the ownership of serialized message * - * \return true for asynchronous processing. Otherwise, synchronous processing + * \return true if writer processing needs the ownership. Otherwise, return false. */ - bool is_async_process_message() override; + bool request_owned_serialized_data() override; protected: std::string base_folder_; diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 09882c54df..36dbc1f5aa 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -119,7 +119,7 @@ void Writer::write( serialized_bag_message->topic_name = topic_name; serialized_bag_message->time_stamp = time.nanoseconds(); - if (writer_impl_->is_async_process_message()) { + if (writer_impl_->request_owned_serialized_data()) { // For asynchronous processing message, duplicate message serialized_bag_message->serialized_data = std::shared_ptr( new rcutils_uint8_array_t, diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 78d685e08d..015454d8ec 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -313,7 +313,7 @@ bool SequentialWriter::take_snapshot() return true; } -bool SequentialWriter::is_async_process_message() +bool SequentialWriter::request_owned_serialized_data() { if (converter_ == nullptr && storage_options_.max_cache_size != 0u) { return true; diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp index d81392f437..08f5ef9d1a 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp @@ -64,7 +64,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn return true; } - bool is_async_process_message() override + bool request_owned_serialized_data() override { return false; } From 3264d5a4487ed1d3c91737b41f556f48aaf50956 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 7 Sep 2021 14:19:28 +0800 Subject: [PATCH 6/9] Update code Signed-off-by: Barry Xu --- rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 36dbc1f5aa..5ccc72667e 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -141,7 +141,7 @@ void Writer::write( &allocator); if (ret != RCUTILS_RET_OK) { auto err = std::string("Failed to call rcutils_uint8_array_init(): return "); - err += ret; + err += std::to_string(ret); throw std::runtime_error(err); } From c29cead8301d95cb4ae896c42882e6ac3c574b14 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 8 Sep 2021 13:12:45 +0800 Subject: [PATCH 7/9] Address review comments Signed-off-by: Barry Xu --- .../rosbag2_compression/sequential_compression_writer.cpp | 6 +----- rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 4 +++- rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp | 6 +----- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 34982f1790..52d8b4eaf5 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -218,11 +218,7 @@ void SequentialCompressionWriter::close() bool SequentialCompressionWriter::request_owned_serialized_data() { - if (compression_options_.compression_mode == CompressionMode::MESSAGE) { - return true; - } else { - return false; - } + return compression_options_.compression_mode == CompressionMode::MESSAGE; } void SequentialCompressionWriter::create_topic( diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 5ccc72667e..11a7379e11 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -120,7 +120,7 @@ void Writer::write( serialized_bag_message->time_stamp = time.nanoseconds(); if (writer_impl_->request_owned_serialized_data()) { - // For asynchronous processing message, duplicate message + // Need owned serialized data, so duplicate message serialized_bag_message->serialized_data = std::shared_ptr( new rcutils_uint8_array_t, [](rcutils_uint8_array_t * msg) { @@ -153,6 +153,8 @@ void Writer::write( serialized_bag_message->serialized_data.get()->buffer_length = message.get_rcl_serialized_message().buffer_length; } else { + // temporary store the payload in a shared_ptr. + // add custom no-op deleter to avoid deep copying data. serialized_bag_message->serialized_data = std::shared_ptr( const_cast(&message.get_rcl_serialized_message()), [](rcutils_uint8_array_t * /* data */) {}); diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 015454d8ec..2463be7315 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -315,11 +315,7 @@ bool SequentialWriter::take_snapshot() bool SequentialWriter::request_owned_serialized_data() { - if (converter_ == nullptr && storage_options_.max_cache_size != 0u) { - return true; - } else { - return false; - } + return converter_ == nullptr && storage_options_.max_cache_size != 0u; } std::shared_ptr From 86602a7a48c76af4ff2b31b60d18ac5596f49c08 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Thu, 9 Sep 2021 09:21:53 +0800 Subject: [PATCH 8/9] If compression mode is file, have cases without deep copy Signed-off-by: Barry Xu --- .../src/rosbag2_compression/sequential_compression_writer.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 52d8b4eaf5..3df9bb2e81 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -218,7 +218,8 @@ void SequentialCompressionWriter::close() bool SequentialCompressionWriter::request_owned_serialized_data() { - return compression_options_.compression_mode == CompressionMode::MESSAGE; + return compression_options_.compression_mode == CompressionMode::MESSAGE || + SequentialWriter::request_owned_serialized_data(); } void SequentialCompressionWriter::create_topic( From bad09097d70e05a9723902b65fa561bb1bb6088f Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 14 Sep 2021 16:58:43 +0800 Subject: [PATCH 9/9] Make a helper function for readability Signed-off-by: Barry Xu --- rosbag2_cpp/include/rosbag2_cpp/writer.hpp | 3 + rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 69 ++++++++++--------- .../mock_sequential_writer.hpp | 2 +- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index e41629a2bd..87469e0258 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -183,6 +183,9 @@ class ROSBAG2_CPP_PUBLIC Writer final private: std::mutex writer_mutex_; std::unique_ptr writer_impl_; + void copy_message_into( + const rclcpp::SerializedMessage & message, + std::shared_ptr & serialized_bag_message); }; } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 11a7379e11..5cf8a892b4 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -121,37 +121,7 @@ void Writer::write( if (writer_impl_->request_owned_serialized_data()) { // Need owned serialized data, so duplicate message - serialized_bag_message->serialized_data = std::shared_ptr( - new rcutils_uint8_array_t, - [](rcutils_uint8_array_t * msg) { - auto fini_return = rcutils_uint8_array_fini(msg); - delete msg; - if (fini_return != RCUTILS_RET_OK) { - RCLCPP_ERROR_STREAM( - rclcpp::get_logger("rosbag2_cpp"), - "Failed to destroy serialized message: " << rcutils_get_error_string().str); - } - }); - - rcutils_allocator_t allocator = rcutils_get_default_allocator(); - - rcutils_ret_t ret = rcutils_uint8_array_init( - serialized_bag_message->serialized_data.get(), - message.get_rcl_serialized_message().buffer_capacity, - &allocator); - if (ret != RCUTILS_RET_OK) { - auto err = std::string("Failed to call rcutils_uint8_array_init(): return "); - err += std::to_string(ret); - throw std::runtime_error(err); - } - - std::memcpy( - serialized_bag_message->serialized_data.get()->buffer, - message.get_rcl_serialized_message().buffer, - message.get_rcl_serialized_message().buffer_length); - - serialized_bag_message->serialized_data.get()->buffer_length = - message.get_rcl_serialized_message().buffer_length; + copy_message_into(message, serialized_bag_message); } else { // temporary store the payload in a shared_ptr. // add custom no-op deleter to avoid deep copying data. @@ -164,4 +134,41 @@ void Writer::write( serialized_bag_message, topic_name, type_name, rmw_get_serialization_format()); } +void Writer::copy_message_into( + const rclcpp::SerializedMessage & message, + std::shared_ptr & serialized_bag_message) +{ + serialized_bag_message->serialized_data = std::shared_ptr( + new rcutils_uint8_array_t, + [](rcutils_uint8_array_t * msg) { + auto fini_return = rcutils_uint8_array_fini(msg); + delete msg; + if (fini_return != RCUTILS_RET_OK) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rosbag2_cpp"), + "Failed to destroy serialized message: " << rcutils_get_error_string().str); + } + }); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + rcutils_ret_t ret = rcutils_uint8_array_init( + serialized_bag_message->serialized_data.get(), + message.get_rcl_serialized_message().buffer_capacity, + &allocator); + if (ret != RCUTILS_RET_OK) { + auto err = std::string("Failed to call rcutils_uint8_array_init(): return "); + err += std::to_string(ret); + throw std::runtime_error(err); + } + + std::memcpy( + serialized_bag_message->serialized_data.get()->buffer, + message.get_rcl_serialized_message().buffer, + message.get_rcl_serialized_message().buffer_length); + + serialized_bag_message->serialized_data.get()->buffer_length = + message.get_rcl_serialized_message().buffer_length; +} + } // namespace rosbag2_cpp diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp index 08f5ef9d1a..fc33223665 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp @@ -62,7 +62,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn std::swap(snapshot_buffer_, messages_); snapshot_buffer_.clear(); return true; - } + } bool request_owned_serialized_data() override {