Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,38 @@ class ROSBAG2_CPP_PUBLIC Writer final
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param time The time stamp of the message
* \throws runtime_error if the Writer is not open.
* \throws runtime_error if the Writer is not open or duplicating message is failed.
*/
[[deprecated(
"Use write(std::shared_ptr<rclcpp::SerializedMessage> message," \
" const std::string & topic_name," \
" const std::string & type_name," \
" const rclcpp::Time & time) instead."
)]]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 199 in this file is causing the Windows warning - since the "non-serialized ROS message" template function calls this "reference version".

I think that the solution is to create a shared_ptr to the SerializedBagMessage, instead of a local instance.

Something like:

  template<class MessageT>
  void write(
    const MessageT & message,
    const std::string & topic_name,
    const rclcpp::Time & time)
  {
    auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
    rclcpp::Serialization<MessageT> serialization;
    serialization.serialize_message(&message, serialized_msg.get());
    return write(serialized_msg, topic_name, rosidl_generator_traits::name<MessageT>(), time);

void write(
const rclcpp::SerializedMessage & message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time);

/**
* Write a serialized message to a bagfile.
* The topic will be created if it has not been created already.
*
* \warning after calling this function, the serialized data will no longer be managed by message.
*
* \param message rclcpp::SerializedMessage The serialized message to be written to the bagfile
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param time The time stamp of the message
* \throws runtime_error if the Writer is not open.
*/
void write(
std::shared_ptr<rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time);

/**
* Write a non-serialized message to a bagfile.
* The topic will be created if it has not been created already.
Expand All @@ -168,10 +192,10 @@ class ROSBAG2_CPP_PUBLIC Writer final
const std::string & topic_name,
const rclcpp::Time & time)
{
rclcpp::SerializedMessage serialized_msg;
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();

rclcpp::Serialization<MessageT> serialization;
serialization.serialize_message(&message, &serialized_msg);
serialization.serialize_message(&message, serialized_msg.get());
return write(serialized_msg, topic_name, rosidl_generator_traits::name<MessageT>(), time);
}

Expand Down
67 changes: 63 additions & 4 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

#include <algorithm>
#include <chrono>
#include <cstring>
#include <memory>
#include <stdexcept>
#include <string>
#include <utility>

#include "rclcpp/logging.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/time.hpp"

Expand Down Expand Up @@ -117,14 +119,71 @@ void Writer::write(
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();

// temporary store the payload in a shared_ptr.
// add custom no-op deleter to avoid deep copying data.
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&message.get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_cpp"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});

// While using compression mode and cache size isn't 0, another thread deals with this serialized
// message asynchronously.
// In order to keep serialized message valid, have to duplicate message.

rcutils_allocator_t allocator = rcutils_get_default_allocator();

rcutils_ret_t ret = rcutils_uint8_array_init(
serialized_bag_message->serialized_data.get(),
message.get_rcl_serialized_message().buffer_capacity,
&allocator);
if (ret != RCUTILS_RET_OK) {
auto err = std::string("Failed to call rcutils_uint8_array_init(): return ");
err += ret;
throw std::runtime_error(err);
}

std::memcpy(
serialized_bag_message->serialized_data->buffer,
message.get_rcl_serialized_message().buffer,
message.get_rcl_serialized_message().buffer_length);

serialized_bag_message->serialized_data->buffer_length =
message.get_rcl_serialized_message().buffer_length;

return write(
serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}

void Writer::write(
std::shared_ptr<rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time)
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();

serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_cpp"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});

*serialized_bag_message->serialized_data = message->release_rcl_serialized_message();

return write(serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}

} // namespace rosbag2_cpp
6 changes: 5 additions & 1 deletion rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example)
writer.write(bag_message, "/my/other/topic", "test_msgs/msg/BasicTypes");

// yet another alternative, writing the rclcpp::SerializedMessage directly
std::shared_ptr<rclcpp::SerializedMessage> serialized_msg2 =
std::make_shared<rclcpp::SerializedMessage>();
serialization.serialize_message(&test_msg, serialized_msg2.get());

writer.write(
serialized_msg, "/yet/another/topic", "test_msgs/msg/BasicTypes",
serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes",
rclcpp::Clock().now());

// writing a non-serialized message
Expand Down
30 changes: 3 additions & 27 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <vector>

#include "rclcpp/logging.hpp"
#include "rclcpp/clock.hpp"

#include "rosbag2_cpp/writer.hpp"

Expand Down Expand Up @@ -234,33 +235,8 @@ Recorder::create_subscription(
topic_name,
topic_type,
qos,
[this, topic_name](std::shared_ptr<rclcpp::SerializedMessage> message) {
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
// the serialized bag message takes ownership of the incoming rclcpp serialized message
// we therefore have to make sure to cleanup that memory in a custom deleter.
bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_transport"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});
*bag_message->serialized_data = message->release_rcl_serialized_message();
bag_message->topic_name = topic_name;
rcutils_time_point_value_t time_stamp;
int error = rcutils_system_time_now(&time_stamp);
if (error != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
this->get_logger(),
"Error getting current time. Error:" << rcutils_get_error_string().str);
}
bag_message->time_stamp = time_stamp;

writer_->write(bag_message);
[this, topic_name, topic_type](std::shared_ptr<rclcpp::SerializedMessage> message) {
writer_->write(message, topic_name, topic_type, rclcpp::Clock(RCL_SYSTEM_TIME).now());
});
return subscription;
}
Expand Down