Skip to content

Topic fix writer interface issue 2#867

Closed
Barry-Xu-2018 wants to merge 9 commits intoros2:masterfrom
Barry-Xu-2018:topic-fix-writer-interface-issue-2
Closed

Topic fix writer interface issue 2#867
Barry-Xu-2018 wants to merge 9 commits intoros2:masterfrom
Barry-Xu-2018:topic-fix-writer-interface-issue-2

Conversation

@Barry-Xu-2018
Copy link
Copy Markdown
Contributor

Address #856

Comparing with fix #866, add new interface is_async_process_message to BaseWriterInterface.
This new interface can help to get information about if the processing message is asynchronous.
So we can know whether message need to be duplicated.

@Barry-Xu-2018
Copy link
Copy Markdown
Contributor Author

Change interface name from is_async_process_message to request_owned_serialized_data.

Copy link
Copy Markdown
Contributor

@jhdcs jhdcs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fairly good to me. Just a few mild suggestions. The one I'm most interested in seeing is the preservation of the comment in writer.cpp, the others are mostly just mild style stuff.

Copy link
Copy Markdown
Contributor

@jhdcs jhdcs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me!

@Barry-Xu-2018
Copy link
Copy Markdown
Contributor Author

@jhdcs

I added a new commit since I found below case.
If compression mode is file, SequentialCompressionWriter will call SequentialWriter::write.

void SequentialCompressionWriter::write(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)
{
// If the compression mode is FILE, write as normal here. Compressing files doesn't
// occur until after the bag file is split.
// If the compression mode is MESSAGE, push the message into a queue that will be handled
// by the compression threads.
if (compression_options_.compression_mode == CompressionMode::FILE) {
SequentialWriter::write(message);
} else {

So for this case, need to call SequentialWriter::request_owned_serialized_data() to check.

[](rcutils_uint8_array_t * /* data */) {});
if (writer_impl_->request_owned_serialized_data()) {
// Need owned serialized data, so duplicate message
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please pull this duplicating logic into a separate helper function for readability. I think this should look something like:

std::shared_ptr<rosbag2_storage::SerializedBagMessage> serialized_bag_message;
if (writer_impl_->needs_message_ownership()) {
  copy_message_into(message, serialized_bag_message);
} else {
  // temporary store the payload in a shared_ptr.
  // add custom no-op deleter to avoid deep copying data.
  serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
      const_cast<rcutils_uint8_array_t *>(&message.get_rcl_serialized_message()),
      [](rcutils_uint8_array_t * /* data */) {});
}

return write(serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());

serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&message.get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
if (writer_impl_->request_owned_serialized_data()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are adding these workarounds, can you also mark this function signature as [[deprecated()]] in the header? Based on the need to duplicate every message like this, I think we want to discourage its use and remove it later

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this interface is called now by below template interface

/**
* Write a non-serialized message to a bagfile.
* The topic will be created if it has not been created already.
*
* \param message MessageT The serialized message to be written to the bagfile
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param time The time stamp of the message
* \throws runtime_error if the Writer is not open.
*/
template<class MessageT>
void write(
const MessageT & message,
const std::string & topic_name,
const rclcpp::Time & time)
{
rclcpp::SerializedMessage serialized_msg;
rclcpp::Serialization<MessageT> serialization;
serialization.serialize_message(&message, &serialized_msg);
return write(serialized_msg, topic_name, rosidl_generator_traits::name<MessageT>(), time);
}

So I think we also need to modify this template interface.

I want to use this new interface to replace this interface to be deprecated.
https://github.com/Barry-Xu-2018/rosbag2/blob/5f18ae59f1358ae045e1e4b14afadbc333a8737d/rosbag2_cpp/include/rosbag2_cpp/writer.hpp#L149-L166

New interface also can be called instead of below codes.

[this, topic_name](std::shared_ptr<rclcpp::SerializedMessage> message) {
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
// the serialized bag message takes ownership of the incoming rclcpp serialized message
// we therefore have to make sure to cleanup that memory in a custom deleter.
bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_transport"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});
*bag_message->serialized_data = message->release_rcl_serialized_message();
bag_message->topic_name = topic_name;
rcutils_time_point_value_t time_stamp;
int error = rcutils_system_time_now(&time_stamp);
if (error != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
this->get_logger(),
"Error getting current time. Error:" << rcutils_get_error_string().str);
}
bag_message->time_stamp = time_stamp;

What do you think ?


virtual void write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) = 0;

virtual bool request_owned_serialized_data() = 0;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: This feels very much like an under-the-hood implementation detail. It would be nice if we could avoid exposing it to the global interface. However - I don't know if we can do that right now. We may just need to wait until we remove the write(SerializedBagMessage &) interface.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I not find better way instead of adding hook function to global interface.

@Barry-Xu-2018
Copy link
Copy Markdown
Contributor Author

Friendly ping @emersonknapp

… mode to write data

Signed-off-by: Barry Xu <barry.xu@sony.com>
Signed-off-by: Barry Xu <barry.xu@sony.com>
Signed-off-by: Barry Xu <barry.xu@sony.com>
Signed-off-by: Barry Xu <barry.xu@sony.com>
Signed-off-by: Barry Xu <barry.xu@sony.com>
Signed-off-by: Barry Xu <barry.xu@sony.com>
Signed-off-by: Barry Xu <barry.xu@sony.com>
Signed-off-by: Barry Xu <barry.xu@sony.com>
Signed-off-by: Barry Xu <barry.xu@sony.com>
@Barry-Xu-2018 Barry-Xu-2018 force-pushed the topic-fix-writer-interface-issue-2 branch from deb554d to bad0909 Compare September 28, 2021 10:06
@Barry-Xu-2018
Copy link
Copy Markdown
Contributor Author

Do rebase

@Barry-Xu-2018
Copy link
Copy Markdown
Contributor Author

There is new way to avoid adding a new interface.
I will try to implement it and create another PR.

@Barry-Xu-2018
Copy link
Copy Markdown
Contributor Author

Barry-Xu-2018 commented Sep 30, 2021

@emersonknapp @jhdcs

After deep consider, I think we should not modify so much codes for a deprecated interface. So I want to discard this commit.
Please look my PR in #866. (45c9a78)
I add new write interface instead of old one.
For the old interface, we didn't have to make a lot of code changes to fix the problems found.

@emersonknapp
Copy link
Copy Markdown
Collaborator

@Barry-Xu-2018 I have approved the other PR and will merge after it passes CI. Do we still need this PR at all?

@Barry-Xu-2018
Copy link
Copy Markdown
Contributor Author

@emersonknapp

@Barry-Xu-2018 I have approved the other PR and will merge after it passes CI. Do we still need this PR at all?

No. I close this PR.

@Barry-Xu-2018 Barry-Xu-2018 deleted the topic-fix-writer-interface-issue-2 branch March 4, 2025 08:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants