diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp index c2cbceeb55..41925b4c13 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -119,6 +119,13 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter */ void close() override; + /** + * Check if write processing needs to have the ownership of serialized message + * + * \return true if writer processing needs the ownership. Otherwise, return false. + */ + bool request_owned_serialized_data() override; + protected: /** * Compress a file and update the metadata file path. diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 8d1ec6872a..3df9bb2e81 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -216,6 +216,12 @@ void SequentialCompressionWriter::close() storage_factory_.reset(); } +bool SequentialCompressionWriter::request_owned_serialized_data() +{ + return compression_options_.compression_mode == CompressionMode::MESSAGE || + SequentialWriter::request_owned_serialized_data(); +} + void SequentialCompressionWriter::create_topic( const rosbag2_storage::TopicMetadata & topic_with_type) { diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index 5f2d0796ed..87469e0258 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -144,7 +144,7 @@ class ROSBAG2_CPP_PUBLIC Writer final * \param topic_name the string of the topic this messages belongs to * \param type_name the string of the type associated with this message * \param time The time stamp of the message - * \throws runtime_error if the Writer is not open. + * \throws runtime_error if the Writer is not open or duplicating message is failed. */ void write( const rclcpp::SerializedMessage & message, @@ -183,6 +183,9 @@ class ROSBAG2_CPP_PUBLIC Writer final private: std::mutex writer_mutex_; std::unique_ptr writer_impl_; + void copy_message_into( + const rclcpp::SerializedMessage & message, + std::shared_ptr & serialized_bag_message); }; } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp index 396b6a4ae2..0963e3d8ac 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp @@ -51,6 +51,8 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface * \returns true if snapshot is successful, false if snapshot fails or is not supported */ virtual bool take_snapshot() = 0; + + virtual bool request_owned_serialized_data() = 0; }; } // namespace writer_interfaces diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 4bc14b47bb..157fcc3792 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -114,6 +114,13 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter */ bool take_snapshot() override; + /** + * Check if write processing needs to have the ownership of serialized message + * + * \return true if writer processing needs the ownership. Otherwise, return false. + */ + bool request_owned_serialized_data() override; + protected: std::string base_folder_; std::unique_ptr storage_factory_; diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 77632a3977..5cf8a892b4 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -16,11 +16,13 @@ #include #include +#include #include #include #include #include +#include "rclcpp/logging.hpp" #include "rclcpp/serialized_message.hpp" #include "rclcpp/time.hpp" @@ -117,14 +119,56 @@ void Writer::write( serialized_bag_message->topic_name = topic_name; serialized_bag_message->time_stamp = time.nanoseconds(); - // temporary store the payload in a shared_ptr. - // add custom no-op deleter to avoid deep copying data. - serialized_bag_message->serialized_data = std::shared_ptr( - const_cast(&message.get_rcl_serialized_message()), - [](rcutils_uint8_array_t * /* data */) {}); + if (writer_impl_->request_owned_serialized_data()) { + // Need owned serialized data, so duplicate message + copy_message_into(message, serialized_bag_message); + } else { + // temporary store the payload in a shared_ptr. + // add custom no-op deleter to avoid deep copying data. + serialized_bag_message->serialized_data = std::shared_ptr( + const_cast(&message.get_rcl_serialized_message()), + [](rcutils_uint8_array_t * /* data */) {}); + } return write( serialized_bag_message, topic_name, type_name, rmw_get_serialization_format()); } +void Writer::copy_message_into( + const rclcpp::SerializedMessage & message, + std::shared_ptr & serialized_bag_message) +{ + serialized_bag_message->serialized_data = std::shared_ptr( + new rcutils_uint8_array_t, + [](rcutils_uint8_array_t * msg) { + auto fini_return = rcutils_uint8_array_fini(msg); + delete msg; + if (fini_return != RCUTILS_RET_OK) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rosbag2_cpp"), + "Failed to destroy serialized message: " << rcutils_get_error_string().str); + } + }); + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + rcutils_ret_t ret = rcutils_uint8_array_init( + serialized_bag_message->serialized_data.get(), + message.get_rcl_serialized_message().buffer_capacity, + &allocator); + if (ret != RCUTILS_RET_OK) { + auto err = std::string("Failed to call rcutils_uint8_array_init(): return "); + err += std::to_string(ret); + throw std::runtime_error(err); + } + + std::memcpy( + serialized_bag_message->serialized_data.get()->buffer, + message.get_rcl_serialized_message().buffer, + message.get_rcl_serialized_message().buffer_length); + + serialized_bag_message->serialized_data.get()->buffer_length = + message.get_rcl_serialized_message().buffer_length; +} + } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 2273a29e25..2463be7315 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -313,6 +313,11 @@ bool SequentialWriter::take_snapshot() return true; } +bool SequentialWriter::request_owned_serialized_data() +{ + return converter_ == nullptr && storage_options_.max_cache_size != 0u; +} + std::shared_ptr SequentialWriter::get_writeable_message( std::shared_ptr message) diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp index 0845e5d3f8..fc33223665 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp @@ -64,6 +64,11 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn return true; } + bool request_owned_serialized_data() override + { + return false; + } + const std::vector> & get_messages() { return messages_;