Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared Memory on C++ API #363

Open
wants to merge 29 commits into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f59f93a
C++ SHM
yellowhatter Dec 27, 2024
449ae80
fix default shm msg size threshold
yellowhatter Dec 27, 2024
5a5fa3a
code format fix
yellowhatter Dec 27, 2024
80e2326
add lost tracepoints
yellowhatter Dec 27, 2024
fd9e715
code format
yellowhatter Dec 27, 2024
8b2284f
remove unnecessry flags from zenoh-cpp vendor
yellowhatter Dec 27, 2024
048e158
review fixes
yellowhatter Dec 27, 2024
6a20e70
codestyle fixes
yellowhatter Dec 27, 2024
6591885
Change 2023 -> 2024 in copyright
yellowhatter Dec 27, 2024
f12358c
Remove obsolete comment from default config
yellowhatter Dec 27, 2024
4d9f122
include <cstddef>
yellowhatter Jan 6, 2025
4fd8bce
Merge commit '2429bf27c798af7be0b759bea1d770152b0f5042'
yellowhatter Jan 6, 2025
bfc2983
style fix
yellowhatter Jan 6, 2025
dfbfec1
bugfixed zenoh
yellowhatter Jan 6, 2025
1d8a5e5
fix: Update VCS_VERSION for zenoh_cpp_vendor to latest commit (#52)
imstevenpmwork Jan 6, 2025
d01c019
Merge commit '0ef1d11cfa09fb6d409706c48a8e471bb72fef68'
yellowhatter Jan 8, 2025
fe89b2c
bump zenoh
yellowhatter Jan 9, 2025
131cac9
- non-blocking allocation for SHM
yellowhatter Jan 9, 2025
89735b1
change defaulkt shm size to 16mb to fit big topics
yellowhatter Jan 10, 2025
1625212
Merge commit '3739a0f1bceaed7e923d8f5d25b9db1f24f1d1fc'
yellowhatter Jan 13, 2025
25fc628
Merge commit 'f040f5cc2d2a2a4d7c6e8713e9526a6273b95427'
Jan 14, 2025
c744418
Merge commit '2930620850d09474b3178518f8fcd66b19fc9967'
yellowhatter Jan 16, 2025
4e71886
Merge commit 'bda2d7181d7e1ea2f7d143fabf7c4ffc6c670b20'
yellowhatter Jan 21, 2025
f5520c3
remove RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
yellowhatter Jan 28, 2025
681f942
up zenoh (optimized SHM subsystem)
yellowhatter Jan 30, 2025
ccadc0c
Update CMakeLists.txt
yellowhatter Jan 30, 2025
94658e4
Merge commit 'e638f8c5ea79d614596ea4dfa89f07dc8065ad93'
yellowhatter Jan 30, 2025
3af6854
Update CMakeLists.txt
yellowhatter Jan 30, 2025
61056c5
up zenoh
yellowhatter Jan 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ add_library(rmw_zenoh_cpp SHARED
src/detail/rmw_subscription_data.cpp
src/detail/service_type_support.cpp
src/detail/simplified_xxhash3.cpp
src/detail/shm_context.cpp
src/detail/type_support.cpp
src/detail/type_support_common.cpp
src/detail/zenoh_config.cpp
Expand Down
4 changes: 1 addition & 3 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_ROUTER_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,7 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
enabled: true,
},
auth: {
/// The configuration of authentication.
Expand Down
4 changes: 1 addition & 3 deletions rmw_zenoh_cpp/config/DEFAULT_RMW_ZENOH_SESSION_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,7 @@
/// A probing procedure for shared memory is performed upon session opening. To enable zenoh to operate
/// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
/// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
///
/// ROS setting: disabled by default until fully tested
enabled: false,
enabled: true,
},
auth: {
/// The configuration of authentication.
Expand Down
37 changes: 18 additions & 19 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,20 +166,19 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
}
}

// Initialize the shm manager if shared_memory is enabled in the config.
shm_provider_ = std::nullopt;
#ifndef _MSC_VER
if (shm_enabled == "true") {
auto layout = zenoh::MemoryLayout(
SHM_BUFFER_SIZE_MB * 1024 * 1024,
zenoh::AllocAlignment({5}));
zenoh::PosixShmProvider provider(layout, &result);
if (result != Z_OK) {
throw std::runtime_error("Unable to create shm provider.");
}
shm_provider_ = std::move(provider);
// Initialize the shm subsystem if shared_memory is enabled in the config
if (rmw_zenoh_cpp::zenoh_shm_enabled()) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled");

shm_ = std::make_optional(
rmw_zenoh_cpp::ShmContext(
rmw_zenoh_cpp::zenoh_shm_alloc_size(),
rmw_zenoh_cpp::zenoh_shm_message_size_threshold()
));
} else {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is disabled");
}
#endif

graph_guard_condition_ = std::make_unique<rmw_guard_condition_t>();
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
graph_guard_condition_->data = &guard_condition_data_;
Expand Down Expand Up @@ -262,10 +261,10 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
return session_;
}

std::optional<zenoh::ShmProvider> & shm_provider()
std::optional<rmw_zenoh_cpp::ShmContext> & shm()
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return shm_provider_;
return shm_;
}

rmw_guard_condition_t * graph_guard_condition()
Expand Down Expand Up @@ -398,9 +397,9 @@ class rmw_context_impl_s::Data final : public std::enable_shared_from_this<Data>
std::string enclave_;
// An owned session.
std::shared_ptr<zenoh::Session> session_;
// An optional SHM manager that is initialized of SHM is enabled in the
// An optional SHM context that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<zenoh::ShmProvider> shm_provider_;
std::optional<rmw_zenoh_cpp::ShmContext> shm_;
// Graph cache.
std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache_;
// ROS graph liveliness subscriber.
Expand Down Expand Up @@ -457,9 +456,9 @@ const std::shared_ptr<zenoh::Session> rmw_context_impl_s::session() const
}

///=============================================================================
std::optional<zenoh::ShmProvider> & rmw_context_impl_s::shm_provider()
std::optional<rmw_zenoh_cpp::ShmContext> & rmw_context_impl_s::shm()
{
return data_->shm_provider();
return data_->shm();
}

///=============================================================================
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ struct rmw_context_impl_s final
// Loan the Zenoh session.
const std::shared_ptr<zenoh::Session> session() const;

// Get a reference to the shm_provider.
// Get a reference to the shm subsystem.
// Note: This is not thread-safe.
// TODO(Yadunund): Remove this API and instead include a publish() API
// that handles the shm_provider once the context manages publishers.
std::optional<zenoh::ShmProvider> & shm_provider();
std::optional<rmw_zenoh_cpp::ShmContext> & shm();

// Get the graph guard condition.
rmw_guard_condition_t * graph_guard_condition();
Expand Down
100 changes: 80 additions & 20 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ PublisherData::PublisherData(
///=============================================================================
rmw_ret_t PublisherData::publish(
const void * ros_message,
std::optional<zenoh::ShmProvider> & /*shm_provider*/)
const std::optional<ShmContext> & shm
)
{
std::lock_guard<std::mutex> lock(mutex_);
if (is_shutdown_) {
Expand All @@ -218,19 +219,49 @@ rmw_ret_t PublisherData::publish(
// To store serialized message byte array.
char * msg_bytes = nullptr;

std::optional<zenoh::ZShmMut> shmbuf = std::nullopt;

rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;

auto always_free_msg_bytes = rcpputils::make_scope_exit(
[&msg_bytes, allocator]() {
if (msg_bytes) {
[&msg_bytes, allocator, &shmbuf]() {
if (msg_bytes && !shmbuf.has_value()) {
allocator->deallocate(msg_bytes, allocator->state);
}
});

// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
// Get memory from SHM buffer if available.
if (shm.has_value() && max_data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
auto alloc_result = provider.alloc_gc_defrag(
max_data_length,
zenoh::AllocAlignment({0}));

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
msg_bytes = reinterpret_cast<char *>(buf.data());
shmbuf = std::make_optional(std::move(buf));
} else {
// Print a warning and revert to regular allocation
RMW_ZENOH_LOG_DEBUG_NAMED(
"rmw_zenoh_cpp", "Failed to allocate a SHM buffer, fallback to non-SHM");

// TODO(yellowhatter): split the whole publish method onto shm and non-shm versions
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
}
} else {
// Get memory from the allocator.
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
RMW_CHECK_FOR_NULL_WITH_MSG(
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
}

// Object that manages the raw buffer
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
Expand All @@ -257,11 +288,11 @@ rmw_ret_t PublisherData::publish(
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

// TODO(ahcorde): shmbuf
std::vector<uint8_t> raw_data(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
zenoh::Bytes payload(std::move(raw_data));
auto payload = shmbuf.has_value() ? zenoh::Bytes(std::move(*shmbuf)) :
zenoh::Bytes(
std::vector<uint8_t>(
reinterpret_cast<const uint8_t *>(msg_bytes),
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), ros_message, source_timestamp);
Expand All @@ -283,7 +314,7 @@ rmw_ret_t PublisherData::publish(
///=============================================================================
rmw_ret_t PublisherData::publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<zenoh::ShmProvider> & /*shm_provider*/)
const std::optional<ShmContext> & shm)
{
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
Expand All @@ -305,14 +336,43 @@ rmw_ret_t PublisherData::publish_serialized_message(
options.attachment = rmw_zenoh_cpp::AttachmentData(
sequence_number_++, source_timestamp, entity_->copy_gid()).serialize_to_zbytes();

std::vector<uint8_t> raw_data(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(std::move(raw_data));
// Get memory from SHM buffer if available.
if (shm.has_value() && data_length >= shm.value().msgsize_threshold) {
RMW_ZENOH_LOG_DEBUG_NAMED("rmw_zenoh_cpp", "SHM is enabled.");

auto & provider = shm.value().shm_provider;

// TODO(yellowhatter): SHM, use alignment based on msgsize_threshold
auto alloc_result = provider.alloc_gc_defrag_blocking(data_length, zenoh::AllocAlignment({0}));

if (std::holds_alternative<zenoh::ZShmMut>(alloc_result)) {
auto && buf = std::get<zenoh::ZShmMut>(std::move(alloc_result));
auto msg_bytes = reinterpret_cast<char *>(buf.data());
memcpy(msg_bytes, serialized_message->buffer, data_length);
zenoh::Bytes payload(std::move(buf));

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message,
source_timestamp);

pub_.put(std::move(payload), std::move(options), &result);
} else {
// TODO(Yadunund): Should we revert to regular allocation and not return an error?
RMW_SET_ERROR_MSG("Failed to allocate a SHM buffer, even after GCing.");
return RMW_RET_ERROR;
}
} else {
std::vector<uint8_t> raw_image(
serialized_message->buffer,
serialized_message->buffer + data_length);
zenoh::Bytes payload(raw_image);

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp);

pub_.put(std::move(payload), std::move(options), &result);
}

TRACETOOLS_TRACEPOINT(
rmw_publish, static_cast<const void *>(rmw_publisher_), serialized_message, source_timestamp);
pub_.put(std::move(payload), std::move(options), &result);
if (result != Z_OK) {
if (result == Z_ESESSION_CLOSED) {
RMW_ZENOH_LOG_WARN_NAMED(
Expand Down
7 changes: 5 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_publisher_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "event.hpp"
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "shm_context.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

Expand Down Expand Up @@ -57,12 +58,14 @@ class PublisherData final
// Publish a ROS message.
rmw_ret_t publish(
const void * ros_message,
std::optional<zenoh::ShmProvider> & shm_provider);
const std::optional<ShmContext> & shm
);

// Publish a serialized ROS message.
rmw_ret_t publish_serialized_message(
const rmw_serialized_message_t * serialized_message,
std::optional<zenoh::ShmProvider> & shm_provider);
const std::optional<ShmContext> & shm
);

// Get a copy of the keyexpr_hash of this PublisherData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "liveliness_utils.hpp"
#include "message_type_support.hpp"
#include "attachment_helpers.hpp"
#include "shm_context.hpp"
#include "type_support_common.hpp"
#include "zenoh_utils.hpp"

Expand Down
29 changes: 29 additions & 0 deletions rmw_zenoh_cpp/src/detail/shm_context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <stdexcept>

#include "shm_context.hpp"

namespace rmw_zenoh_cpp
{
///=============================================================================
ShmContext::ShmContext(size_t alloc_size, size_t msgsize_threshold)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include <cstddef>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in shm_context.hpp

: // Create Layout for provider's memory
// Provider's alignment is 1 (=2^0) bytes as we are going to make only 1-byte aligned allocations
// TODO(yellowhatter): use zenoh_shm_message_size_threshold as base for alignment
shm_provider(zenoh::PosixShmProvider(zenoh::MemoryLayout(alloc_size, zenoh::AllocAlignment {0}))),
msgsize_threshold(msgsize_threshold)
{}
} // namespace rmw_zenoh_cpp
34 changes: 34 additions & 0 deletions rmw_zenoh_cpp/src/detail/shm_context.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DETAIL__SHM_CONTEXT_HPP_
#define DETAIL__SHM_CONTEXT_HPP_

#include <cstddef>

#include <zenoh.hxx>

namespace rmw_zenoh_cpp
{
///=============================================================================
struct ShmContext
{
zenoh::PosixShmProvider shm_provider;
size_t msgsize_threshold;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include <cstddef>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


ShmContext(size_t alloc_size, size_t msgsize_threshold);
};
} // namespace rmw_zenoh_cpp

#endif // DETAIL__SHM_CONTEXT_HPP_
Loading