Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
*/
void close() override;

/**
* Check if write processing needs to have the ownership of serialized message
*
* \return true if writer processing needs the ownership. Otherwise, return false.
*/
bool request_owned_serialized_data() override;

protected:
/**
* Compress a file and update the metadata file path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ void SequentialCompressionWriter::close()
storage_factory_.reset();
}

bool SequentialCompressionWriter::request_owned_serialized_data()
{
return compression_options_.compression_mode == CompressionMode::MESSAGE ||
SequentialWriter::request_owned_serialized_data();
}

void SequentialCompressionWriter::create_topic(
const rosbag2_storage::TopicMetadata & topic_with_type)
{
Expand Down
5 changes: 4 additions & 1 deletion rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class ROSBAG2_CPP_PUBLIC Writer final
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param time The time stamp of the message
* \throws runtime_error if the Writer is not open.
* \throws runtime_error if the Writer is not open or duplicating message is failed.
*/
void write(
const rclcpp::SerializedMessage & message,
Expand Down Expand Up @@ -183,6 +183,9 @@ class ROSBAG2_CPP_PUBLIC Writer final
private:
std::mutex writer_mutex_;
std::unique_ptr<rosbag2_cpp::writer_interfaces::BaseWriterInterface> writer_impl_;
void copy_message_into(
const rclcpp::SerializedMessage & message,
std::shared_ptr<rosbag2_storage::SerializedBagMessage> & serialized_bag_message);
};

} // namespace rosbag2_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface
* \returns true if snapshot is successful, false if snapshot fails or is not supported
*/
virtual bool take_snapshot() = 0;

virtual bool request_owned_serialized_data() = 0;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: This feels very much like an under-the-hood implementation detail. It would be nice if we could avoid exposing it to the global interface. However - I don't know if we can do that right now. We may just need to wait until we remove the write(SerializedBagMessage &) interface.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I not find better way instead of adding hook function to global interface.

};

} // namespace writer_interfaces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
*/
bool take_snapshot() override;

/**
* Check if write processing needs to have the ownership of serialized message
*
* \return true if writer processing needs the ownership. Otherwise, return false.
*/
bool request_owned_serialized_data() override;

protected:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
Expand Down
54 changes: 49 additions & 5 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

#include <algorithm>
#include <chrono>
#include <cstring>
#include <memory>
#include <stdexcept>
#include <string>
#include <utility>

#include "rclcpp/logging.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/time.hpp"

Expand Down Expand Up @@ -117,14 +119,56 @@ void Writer::write(
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();

// temporary store the payload in a shared_ptr.
// add custom no-op deleter to avoid deep copying data.
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&message.get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
if (writer_impl_->request_owned_serialized_data()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are adding these workarounds, can you also mark this function signature as [[deprecated()]] in the header? Based on the need to duplicate every message like this, I think we want to discourage its use and remove it later

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this interface is called now by below template interface

/**
* Write a non-serialized message to a bagfile.
* The topic will be created if it has not been created already.
*
* \param message MessageT The serialized message to be written to the bagfile
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param time The time stamp of the message
* \throws runtime_error if the Writer is not open.
*/
template<class MessageT>
void write(
const MessageT & message,
const std::string & topic_name,
const rclcpp::Time & time)
{
rclcpp::SerializedMessage serialized_msg;
rclcpp::Serialization<MessageT> serialization;
serialization.serialize_message(&message, &serialized_msg);
return write(serialized_msg, topic_name, rosidl_generator_traits::name<MessageT>(), time);
}

So I think we also need to modify this template interface.

I want to use this new interface to replace this interface to be deprecated.
https://github.com/Barry-Xu-2018/rosbag2/blob/5f18ae59f1358ae045e1e4b14afadbc333a8737d/rosbag2_cpp/include/rosbag2_cpp/writer.hpp#L149-L166

New interface also can be called instead of below codes.

[this, topic_name](std::shared_ptr<rclcpp::SerializedMessage> message) {
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
// the serialized bag message takes ownership of the incoming rclcpp serialized message
// we therefore have to make sure to cleanup that memory in a custom deleter.
bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_transport"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});
*bag_message->serialized_data = message->release_rcl_serialized_message();
bag_message->topic_name = topic_name;
rcutils_time_point_value_t time_stamp;
int error = rcutils_system_time_now(&time_stamp);
if (error != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
this->get_logger(),
"Error getting current time. Error:" << rcutils_get_error_string().str);
}
bag_message->time_stamp = time_stamp;

What do you think ?

// Need owned serialized data, so duplicate message
copy_message_into(message, serialized_bag_message);
} else {
// temporary store the payload in a shared_ptr.
// add custom no-op deleter to avoid deep copying data.
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&message.get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
}

return write(
serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}

void Writer::copy_message_into(
const rclcpp::SerializedMessage & message,
std::shared_ptr<rosbag2_storage::SerializedBagMessage> & serialized_bag_message)
{
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_cpp"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});

rcutils_allocator_t allocator = rcutils_get_default_allocator();

rcutils_ret_t ret = rcutils_uint8_array_init(
serialized_bag_message->serialized_data.get(),
message.get_rcl_serialized_message().buffer_capacity,
&allocator);
if (ret != RCUTILS_RET_OK) {
auto err = std::string("Failed to call rcutils_uint8_array_init(): return ");
err += std::to_string(ret);
throw std::runtime_error(err);
}

std::memcpy(
serialized_bag_message->serialized_data.get()->buffer,
message.get_rcl_serialized_message().buffer,
message.get_rcl_serialized_message().buffer_length);

serialized_bag_message->serialized_data.get()->buffer_length =
message.get_rcl_serialized_message().buffer_length;
}

} // namespace rosbag2_cpp
5 changes: 5 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ bool SequentialWriter::take_snapshot()
return true;
}

bool SequentialWriter::request_owned_serialized_data()
{
return converter_ == nullptr && storage_options_.max_cache_size != 0u;
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage>
SequentialWriter::get_writeable_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn
return true;
}

bool request_owned_serialized_data() override
{
return false;
}

const std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> & get_messages()
{
return messages_;
Expand Down