diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index 5f2d0796ed..25d2e4bb5d 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -144,14 +144,38 @@ class ROSBAG2_CPP_PUBLIC Writer final * \param topic_name the string of the topic this messages belongs to * \param type_name the string of the type associated with this message * \param time The time stamp of the message - * \throws runtime_error if the Writer is not open. + * \throws runtime_error if the Writer is not open or duplicating message is failed. */ + [[deprecated( + "Use write(std::shared_ptr message," \ + " const std::string & topic_name," \ + " const std::string & type_name," \ + " const rclcpp::Time & time) instead." + )]] void write( const rclcpp::SerializedMessage & message, const std::string & topic_name, const std::string & type_name, const rclcpp::Time & time); + /** + * Write a serialized message to a bagfile. + * The topic will be created if it has not been created already. + * + * \warning after calling this function, the serialized data will no longer be managed by message. + * + * \param message rclcpp::SerializedMessage The serialized message to be written to the bagfile + * \param topic_name the string of the topic this messages belongs to + * \param type_name the string of the type associated with this message + * \param time The time stamp of the message + * \throws runtime_error if the Writer is not open. + */ + void write( + std::shared_ptr message, + const std::string & topic_name, + const std::string & type_name, + const rclcpp::Time & time); + /** * Write a non-serialized message to a bagfile. * The topic will be created if it has not been created already. @@ -168,10 +192,10 @@ class ROSBAG2_CPP_PUBLIC Writer final const std::string & topic_name, const rclcpp::Time & time) { - rclcpp::SerializedMessage serialized_msg; + auto serialized_msg = std::make_shared(); rclcpp::Serialization serialization; - serialization.serialize_message(&message, &serialized_msg); + serialization.serialize_message(&message, serialized_msg.get()); return write(serialized_msg, topic_name, rosidl_generator_traits::name(), time); } diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 77632a3977..75457b32e4 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -16,11 +16,13 @@ #include #include +#include #include #include #include #include +#include "rclcpp/logging.hpp" #include "rclcpp/serialized_message.hpp" #include "rclcpp/time.hpp" @@ -117,14 +119,71 @@ void Writer::write( serialized_bag_message->topic_name = topic_name; serialized_bag_message->time_stamp = time.nanoseconds(); - // temporary store the payload in a shared_ptr. - // add custom no-op deleter to avoid deep copying data. serialized_bag_message->serialized_data = std::shared_ptr( - const_cast(&message.get_rcl_serialized_message()), - [](rcutils_uint8_array_t * /* data */) {}); + new rcutils_uint8_array_t, + [](rcutils_uint8_array_t * msg) { + auto fini_return = rcutils_uint8_array_fini(msg); + delete msg; + if (fini_return != RCUTILS_RET_OK) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rosbag2_cpp"), + "Failed to destroy serialized message: " << rcutils_get_error_string().str); + } + }); + + // While using compression mode and cache size isn't 0, another thread deals with this serialized + // message asynchronously. + // In order to keep serialized message valid, have to duplicate message. + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + + rcutils_ret_t ret = rcutils_uint8_array_init( + serialized_bag_message->serialized_data.get(), + message.get_rcl_serialized_message().buffer_capacity, + &allocator); + if (ret != RCUTILS_RET_OK) { + auto err = std::string("Failed to call rcutils_uint8_array_init(): return "); + err += ret; + throw std::runtime_error(err); + } + + std::memcpy( + serialized_bag_message->serialized_data->buffer, + message.get_rcl_serialized_message().buffer, + message.get_rcl_serialized_message().buffer_length); + + serialized_bag_message->serialized_data->buffer_length = + message.get_rcl_serialized_message().buffer_length; return write( serialized_bag_message, topic_name, type_name, rmw_get_serialization_format()); } +void Writer::write( + std::shared_ptr message, + const std::string & topic_name, + const std::string & type_name, + const rclcpp::Time & time) +{ + auto serialized_bag_message = std::make_shared(); + serialized_bag_message->topic_name = topic_name; + serialized_bag_message->time_stamp = time.nanoseconds(); + + serialized_bag_message->serialized_data = std::shared_ptr( + new rcutils_uint8_array_t, + [](rcutils_uint8_array_t * msg) { + auto fini_return = rcutils_uint8_array_fini(msg); + delete msg; + if (fini_return != RCUTILS_RET_OK) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rosbag2_cpp"), + "Failed to destroy serialized message: " << rcutils_get_error_string().str); + } + }); + + *serialized_bag_message->serialized_data = message->release_rcl_serialized_message(); + + return write(serialized_bag_message, topic_name, type_name, rmw_get_serialization_format()); +} + } // namespace rosbag2_cpp diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp index 22a8272f62..256f63a13e 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp @@ -76,8 +76,12 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example) writer.write(bag_message, "/my/other/topic", "test_msgs/msg/BasicTypes"); // yet another alternative, writing the rclcpp::SerializedMessage directly + std::shared_ptr serialized_msg2 = + std::make_shared(); + serialization.serialize_message(&test_msg, serialized_msg2.get()); + writer.write( - serialized_msg, "/yet/another/topic", "test_msgs/msg/BasicTypes", + serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes", rclcpp::Clock().now()); // writing a non-serialized message diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 847c3fab2a..3f90c1c587 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -25,6 +25,7 @@ #include #include "rclcpp/logging.hpp" +#include "rclcpp/clock.hpp" #include "rosbag2_cpp/writer.hpp" @@ -234,33 +235,8 @@ Recorder::create_subscription( topic_name, topic_type, qos, - [this, topic_name](std::shared_ptr message) { - auto bag_message = std::make_shared(); - // the serialized bag message takes ownership of the incoming rclcpp serialized message - // we therefore have to make sure to cleanup that memory in a custom deleter. - bag_message->serialized_data = std::shared_ptr( - new rcutils_uint8_array_t, - [](rcutils_uint8_array_t * msg) { - auto fini_return = rcutils_uint8_array_fini(msg); - delete msg; - if (fini_return != RCUTILS_RET_OK) { - RCLCPP_ERROR_STREAM( - rclcpp::get_logger("rosbag2_transport"), - "Failed to destroy serialized message: " << rcutils_get_error_string().str); - } - }); - *bag_message->serialized_data = message->release_rcl_serialized_message(); - bag_message->topic_name = topic_name; - rcutils_time_point_value_t time_stamp; - int error = rcutils_system_time_now(&time_stamp); - if (error != RCUTILS_RET_OK) { - RCLCPP_ERROR_STREAM( - this->get_logger(), - "Error getting current time. Error:" << rcutils_get_error_string().str); - } - bag_message->time_stamp = time_stamp; - - writer_->write(bag_message); + [this, topic_name, topic_type](std::shared_ptr message) { + writer_->write(message, topic_name, topic_type, rclcpp::Clock(RCL_SYSTEM_TIME).now()); }); return subscription; }