Skip to content

Commit 933b802

Browse files
alsorawjwwood
authored andcommitted
basic ipc implementation from alsora/new_ipc_proposal
Signed-off-by: alberto <[email protected]> better use of node_topic create subscription Signed-off-by: alberto <[email protected]> added intra process manager test Signed-off-by: alberto <[email protected]> fixed ring buffer and added test Signed-off-by: alberto <[email protected]> added intra process buffer test Signed-off-by: alberto <[email protected]> added intra process buffer test Signed-off-by: alberto <[email protected]> Signed-off-by: alberto <[email protected]> removed intra-process methods from subscription base Signed-off-by: alberto <[email protected]> using lock_guard instead of unique_lock, renamed var without camel case Signed-off-by: alberto <[email protected]> using unordered set and references in intra process manager Signed-off-by: alberto <[email protected]> subscription intra-process does not depend anymore on subscription, but has a copy of the callback Signed-off-by: alberto <[email protected]> changed buffer API to use rvo Signed-off-by: Alberto <[email protected]> avoid copying shared_ptr Signed-off-by: alberto <[email protected]> revert not needed changes to create_subscription Signed-off-by: alberto <[email protected]> updated tests according to new buffer APIs Signed-off-by: alberto <[email protected]> updated types in ring buffer implementation avoid using uint32_t Signed-off-by: alberto <[email protected]> using unique ptr for buffers in subscription_intra_process Signed-off-by: alberto <[email protected]> added missing std::move in subscription_intra_process constructor Signed-off-by: alberto <[email protected]> use consisting names for ring_buffer_implementation members Signed-off-by: alberto <[email protected]> addressing typos, one-liners and similar from ivanpauno review Signed-off-by: alberto <[email protected]> moved subscription_intra_process_base to its own files and moved non templated method from derived class Signed-off-by: alberto <[email protected]> removed forward declarations, fixed include subscription_intra_process_base Signed-off-by: alberto <[email protected]> removed member variable from do_intra_process_publish signature Signed-off-by: alberto <[email protected]> declare public before private in intra_process_manager_impl Signed-off-by: alberto <[email protected]> made matches_any_intra_process_publishers const Signed-off-by: alberto <[email protected]> using const reference in get_all_matching_publishers Signed-off-by: alberto <[email protected]> added deleter and alloc templates in intra_process_buffer Signed-off-by: alberto <[email protected]> added RCLCPP_WARN to intra_process_manager_impl Signed-off-by: alberto <[email protected]> passing context from node to subscription_intra_process Signed-off-by: alberto <[email protected]> using allocators in intra_process_manager Signed-off-by: alberto <[email protected]> use size_t instead of int in ring buffer indices Signed-off-by: alberto <[email protected]> creating buffer inside subscription_intra_process constructor Signed-off-by: alberto <[email protected]> fix lint errors Signed-off-by: alberto <[email protected]> throw error if trying to dequeue when buffer empty; remove duplicated methods in intra_process_buffer Signed-off-by: alberto <[email protected]> added todo for creating an rmw function for checking qos compatibility Signed-off-by: alberto <[email protected]> test fixes Signed-off-by: alberto <[email protected]> refactored intra_process_manager, removed ipm impl Signed-off-by: alberto <[email protected]> added mutex in intra_process_manager add_* methods Signed-off-by: Soragna, Alberto <[email protected]> added allocator to intra_process_buffer Signed-off-by: Soragna, Alberto <[email protected]> added invalid intra_process qos test for subscription Signed-off-by: Soragna, Alberto <[email protected]> throw error if history size is 0 with keep last and ipc Signed-off-by: Soragna, Alberto <[email protected]> using allocator when creating unique_ptr from shared_ptr Signed-off-by: Soragna, Alberto <[email protected]> adding deleter template argument to intra_process buffer Signed-off-by: Soragna, Alberto <[email protected]> fix linter Signed-off-by: Soragna, Alberto <[email protected]> throw error with callbackT different from messageT Signed-off-by: Soragna, Alberto <[email protected]> updated deleter template argument in subscription factory Signed-off-by: Soragna, Alberto <[email protected]> Fix typo in test fixture tear down method name (#787) Signed-off-by: Jacob Perron <[email protected]> Add free function for creating service clients (#788) Equivalent to the free function for creating a service. Resolves #768 Signed-off-by: Jacob Perron <[email protected]> Cmake infrastructure for creating components (#784) *cmake macro to create components for libraries with multiple nodes Signed-off-by: Siddharth Kucheria <[email protected]> Allow registering multiple on_parameters_set_callback (#772) Signed-off-by: ivanpauno <[email protected]> fix for multiple nodes not being recognized (#790) Signed-off-by: Siddharth Kucheria <[email protected]> Remove non-package from ament_target_dependencies() (#793) Signed-off-by: Shane Loretz <[email protected]> fix linter issue (#795) Signed-off-by: Siddharth Kucheria <[email protected]> Make TimeSource ignore use_sim_time events coming from other nodes. (#799) Signed-off-by: Michel Hidalgo <[email protected]> passing deleter template parameter Signed-off-by: Soragna, Alberto <[email protected]> small fixes for failing tests Signed-off-by: Soragna, Alberto <[email protected]> fixed imports in test_intra_process_manager Signed-off-by: Soragna, Alberto <[email protected]> using RCLCPP_SMART_PTR_ALIASES_ONLY and RCLCPP_PUBLIC macros Signed-off-by: Soragna, Alberto <[email protected]> added RCLCPP_PUBLIC macros and virtual destructor to sub intra_process base Signed-off-by: Soragna, Alberto <[email protected]> added unique_ptr alias to macros Signed-off-by: Soragna, Alberto <[email protected]> updated test_intra_process_manager.cpp Signed-off-by: Soragna, Alberto <[email protected]> remove mock msgs from rclcpp (#800) Signed-off-by: Karsten Knese <[email protected]> Add line break after first open paren in multiline function call (#785) * Add line break after first open paren in multiline function call as per developer guide: https://index.ros.org/doc/ros2/Contributing/Developer-Guide/#open-versus-cuddled-braces see ament/ament_lint#148 Signed-off-by: Dan Rose <[email protected]> Fix dedent when first function argument starts with a brace Signed-off-by: Dan Rose <[email protected]> Line break with multiline if condition Remove line breaks where allowed. Signed-off-by: Dan Rose <[email protected]> Fixup after rebase Signed-off-by: Dan Rose <[email protected]> Fixup again after reverting indent_paren_open_brace Signed-off-by: Dan Rose <[email protected]> * Revert comment spacing change, condense some lines Signed-off-by: Dan Rose <[email protected]> Adapt to '--ros-args ... [--]'-based ROS args extraction (#816) * Use --ros-args to deal with node arguments in rclcpp. Signed-off-by: Michel Hidalgo <[email protected]> * Document implicit --ros-args flag in NodeOptions::arguments(). Signed-off-by: Michel Hidalgo <[email protected]> * Add missing size_t to int cast. Signed-off-by: Michel Hidalgo <[email protected]> * Only add implicit --ros-args flag if not present already. Signed-off-by: Michel Hidalgo <[email protected]> * Add some rclcpp::NodeOptions test coverage. Signed-off-by: Michel Hidalgo <[email protected]> * Address peer review comments. Signed-off-by: Michel Hidalgo <[email protected]> * Please cpplint and uncrustify. Signed-off-by: Michel Hidalgo <[email protected]> Guard against making multiple result requests for a goal handle (#808) This fixes a runtime error caused by a race condition when making consecutive requests for the result. Specifically, this happens if the user provides a result callback when sending a goal and then calls async_get_result shortly after. Resolves #783 Signed-off-by: Jacob Perron <[email protected]> Explain return value of spin_until_future_complete (#792) Signed-off-by: Dan Rose <[email protected]> Allow passing logger by const ref (#820) Signed-off-by: Karsten Knese <[email protected]> Delete unnecessary call for get_node_by_group (#823) Signed-off-by: Tomoya.Fujita <[email protected]> Fix get_node_interfaces functions taking a pointer (#821) Signed-off-by: ivanpauno <[email protected]> add callback group as member variable and constructor arg (#811) Signed-off-by: bpwilcox <[email protected]> remove callback group as member variable Wrap documentation examples in code blocks (#830) This makes the code examples easier to read in the generated documentation. Signed-off-by: Jacob Perron <[email protected]> Crash in callback group pointer vector iterator (#814) Signed-off-by: Guillaume Autran <[email protected]> add mutex in add/remove_node and wait_for_work to protect concurrent use/change of memory_strategy_ (#837) Signed-off-by: Dirk Thomas <[email protected]> Fix hang with timers in MultiThreadedExecutor (#835) (#836) Signed-off-by: Todd Malsbary <[email protected]> Use of -r/--remap flags where appropriate. (#834) Signed-off-by: Michel Hidalgo <[email protected]> Force explicit --ros-args in NodeOptions::arguments(). (#845) Signed-off-by: Michel Hidalgo <[email protected]> Fail on invalid and unknown ROS specific arguments (#842) * Fail on invalid and unknown ROS specific arguments. Signed-off-by: Michel Hidalgo <[email protected]> * Revert changes to utilities.hpp in rclcpp Signed-off-by: Michel Hidalgo <[email protected]> * Fully revert change to utilities.hpp Signed-off-by: Michel Hidalgo <[email protected]> Fix typo in deprecated warning. (#848) "it's" instead of its Signed-off-by: Luca Della Vedova <[email protected]> Add throwing parameter name if parameter is not set (#833) * added throwing parameter name if parameter is not set Signed-off-by: Alex <[email protected]> Signed-off-by: ivanpauno <[email protected]> check valid timer handler 1st to reduce the time window for scan. (#841) Signed-off-by: Tomoya.Fujita <[email protected]> remove features and related code which were deprecated in dashing (#852) Signed-off-by: William Woodall <[email protected]> reset error message before setting a new one, embed the original one (#854) Signed-off-by: Dirk Thomas <[email protected]> restored virtual destructor in publisher_base Signed-off-by: Soragna, Alberto <[email protected]>
1 parent 07cb443 commit 933b802

38 files changed

+2255
-2400
lines changed

rclcpp/CMakeLists.txt

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ set(${PROJECT_NAME}_SRCS
4646
src/rclcpp/graph_listener.cpp
4747
src/rclcpp/init_options.cpp
4848
src/rclcpp/intra_process_manager.cpp
49-
src/rclcpp/intra_process_manager_impl.cpp
5049
src/rclcpp/logger.cpp
5150
src/rclcpp/memory_strategies.cpp
5251
src/rclcpp/memory_strategy.cpp
@@ -74,6 +73,7 @@ set(${PROJECT_NAME}_SRCS
7473
src/rclcpp/service.cpp
7574
src/rclcpp/signal_handler.cpp
7675
src/rclcpp/subscription_base.cpp
76+
src/rclcpp/subscription_intra_process_base.cpp
7777
src/rclcpp/time.cpp
7878
src/rclcpp/time_source.cpp
7979
src/rclcpp/timer.cpp
@@ -196,25 +196,36 @@ if(BUILD_TESTING)
196196
"rosidl_typesupport_cpp"
197197
)
198198
endif()
199-
ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp)
200-
if(TARGET test_mapped_ring_buffer)
201-
ament_target_dependencies(test_mapped_ring_buffer
199+
ament_add_gmock(test_intra_process_manager test/test_intra_process_manager.cpp)
200+
if(TARGET test_intra_process_manager)
201+
ament_target_dependencies(test_intra_process_manager
202202
"rcl"
203203
"rcl_interfaces"
204204
"rmw"
205205
"rosidl_generator_cpp"
206206
"rosidl_typesupport_cpp"
207207
)
208+
target_link_libraries(test_intra_process_manager ${PROJECT_NAME})
208209
endif()
209-
ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp)
210-
if(TARGET test_intra_process_manager)
211-
ament_target_dependencies(test_intra_process_manager
212-
"rcl"
210+
ament_add_gtest(test_ring_buffer_implementation test/test_ring_buffer_implementation.cpp)
211+
if(TARGET test_ring_buffer_implementation)
212+
ament_target_dependencies(test_ring_buffer_implementation
213+
"rcl_interfaces"
214+
"rmw"
215+
"rosidl_generator_cpp"
216+
"rosidl_typesupport_cpp"
217+
)
218+
target_link_libraries(test_ring_buffer_implementation ${PROJECT_NAME})
219+
endif()
220+
ament_add_gtest(test_intra_process_buffer test/test_intra_process_buffer.cpp)
221+
if(TARGET test_intra_process_buffer)
222+
ament_target_dependencies(test_intra_process_buffer
213223
"rcl_interfaces"
214224
"rmw"
215225
"rosidl_generator_cpp"
216226
"rosidl_typesupport_cpp"
217227
)
228+
target_link_libraries(test_intra_process_buffer ${PROJECT_NAME})
218229
endif()
219230
ament_add_gtest(test_node test/test_node.cpp)
220231
if(TARGET test_node)

rclcpp/include/rclcpp/any_subscription_callback.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ class AnySubscriptionCallback
219219
}
220220
}
221221

222-
bool use_take_shared_method()
222+
bool use_take_shared_method() const
223223
{
224224
return const_shared_ptr_callback_ || const_shared_ptr_with_info_callback_;
225225
}
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2015 Open Source Robotics Foundation, Inc.
1+
// Copyright 2019 Open Source Robotics Foundation, Inc.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -12,12 +12,26 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
#include "rclcpp/intra_process_manager_impl.hpp"
15+
#ifndef RCLCPP__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
16+
#define RCLCPP__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
1617

17-
#include <memory>
18+
namespace rclcpp
19+
{
20+
namespace intra_process_buffer
21+
{
1822

19-
rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr
20-
rclcpp::intra_process_manager::create_default_impl()
23+
template<typename BufferT>
24+
class BufferImplementationBase
2125
{
22-
return std::make_shared<IntraProcessManagerImpl<>>();
23-
}
26+
public:
27+
virtual BufferT dequeue() = 0;
28+
virtual void enqueue(BufferT request) = 0;
29+
30+
virtual void clear() = 0;
31+
virtual bool has_data() const = 0;
32+
};
33+
34+
} // namespace intra_process_buffer
35+
} // namespace rclcpp
36+
37+
#endif // RCLCPP__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
// Copyright 2019 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef RCLCPP__BUFFERS__INTRA_PROCESS_BUFFER_HPP_
16+
#define RCLCPP__BUFFERS__INTRA_PROCESS_BUFFER_HPP_
17+
18+
#include <memory>
19+
#include <type_traits>
20+
#include <utility>
21+
22+
#include "rclcpp/allocator/allocator_common.hpp"
23+
#include "rclcpp/allocator/allocator_deleter.hpp"
24+
#include "rclcpp/buffers/buffer_implementation_base.hpp"
25+
26+
namespace rclcpp
27+
{
28+
namespace intra_process_buffer
29+
{
30+
31+
class IntraProcessBufferBase
32+
{
33+
public:
34+
RCLCPP_SMART_PTR_ALIASES_ONLY(IntraProcessBufferBase)
35+
36+
virtual void clear() = 0;
37+
38+
virtual bool has_data() const = 0;
39+
virtual bool use_take_shared_method() const = 0;
40+
};
41+
42+
template<
43+
typename MessageT,
44+
typename Alloc = std::allocator<void>,
45+
typename MessageDeleter = std::default_delete<MessageT>>
46+
class IntraProcessBuffer : public IntraProcessBufferBase
47+
{
48+
public:
49+
RCLCPP_SMART_PTR_ALIASES_ONLY(IntraProcessBuffer)
50+
51+
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
52+
using MessageSharedPtr = std::shared_ptr<const MessageT>;
53+
54+
virtual void add_shared(MessageSharedPtr msg) = 0;
55+
virtual void add_unique(MessageUniquePtr msg) = 0;
56+
57+
virtual MessageSharedPtr consume_shared() = 0;
58+
virtual MessageUniquePtr consume_unique() = 0;
59+
};
60+
61+
template<
62+
typename MessageT,
63+
typename Alloc = std::allocator<void>,
64+
typename MessageDeleter = std::default_delete<MessageT>,
65+
typename BufferT = std::unique_ptr<MessageT>>
66+
class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, MessageDeleter>
67+
{
68+
public:
69+
RCLCPP_SMART_PTR_DEFINITIONS(TypedIntraProcessBuffer)
70+
71+
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
72+
using MessageAlloc = typename MessageAllocTraits::allocator_type;
73+
using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
74+
using MessageSharedPtr = std::shared_ptr<const MessageT>;
75+
76+
TypedIntraProcessBuffer(
77+
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_impl,
78+
std::shared_ptr<Alloc> allocator = nullptr)
79+
{
80+
bool valid_type = (std::is_same<BufferT, MessageSharedPtr>::value ||
81+
std::is_same<BufferT, MessageUniquePtr>::value);
82+
if (!valid_type) {
83+
throw std::runtime_error("Creating TypedIntraProcessBuffer with not valid BufferT");
84+
}
85+
86+
buffer_ = std::move(buffer_impl);
87+
88+
if (!allocator) {
89+
message_allocator_ = std::make_shared<MessageAlloc>();
90+
} else {
91+
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
92+
}
93+
}
94+
95+
void add_shared(MessageSharedPtr msg)
96+
{
97+
add_shared_impl<BufferT>(std::move(msg));
98+
}
99+
100+
void add_unique(MessageUniquePtr msg)
101+
{
102+
buffer_->enqueue(std::move(msg));
103+
}
104+
105+
MessageSharedPtr consume_shared()
106+
{
107+
return consume_shared_impl<BufferT>();
108+
}
109+
110+
MessageUniquePtr consume_unique()
111+
{
112+
return consume_unique_impl<BufferT>();
113+
}
114+
115+
bool has_data() const
116+
{
117+
return buffer_->has_data();
118+
}
119+
120+
void clear()
121+
{
122+
buffer_->clear();
123+
}
124+
125+
bool use_take_shared_method() const
126+
{
127+
return std::is_same<BufferT, MessageSharedPtr>::value;
128+
}
129+
130+
private:
131+
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;
132+
133+
std::shared_ptr<MessageAlloc> message_allocator_;
134+
135+
// MessageSharedPtr to MessageSharedPtr
136+
template<typename DestinationT>
137+
typename std::enable_if<
138+
std::is_same<DestinationT, MessageSharedPtr>::value
139+
>::type
140+
add_shared_impl(MessageSharedPtr shared_msg)
141+
{
142+
buffer_->enqueue(std::move(shared_msg));
143+
}
144+
145+
// MessageSharedPtr to MessageUniquePtr
146+
template<typename DestinationT>
147+
typename std::enable_if<
148+
std::is_same<DestinationT, MessageUniquePtr>::value
149+
>::type
150+
add_shared_impl(MessageSharedPtr shared_msg)
151+
{
152+
// This should not happen: here a copy is unconditionally made, while the intra-process manager
153+
// can decide whether a copy is needed depending on the number and the type of buffers
154+
155+
MessageUniquePtr unique_msg;
156+
MessageDeleter * deleter = std::get_deleter<MessageDeleter, const MessageT>(shared_msg);
157+
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
158+
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *shared_msg);
159+
if (deleter) {
160+
unique_msg = MessageUniquePtr(ptr, *deleter);
161+
} else {
162+
unique_msg = MessageUniquePtr(ptr);
163+
}
164+
165+
buffer_->enqueue(std::move(unique_msg));
166+
}
167+
168+
// MessageSharedPtr to MessageSharedPtr
169+
template<typename OriginT>
170+
typename std::enable_if<
171+
(std::is_same<OriginT, MessageSharedPtr>::value),
172+
MessageSharedPtr
173+
>::type
174+
consume_shared_impl()
175+
{
176+
return buffer_->dequeue();
177+
}
178+
179+
// MessageUniquePtr to MessageSharedPtr
180+
template<typename OriginT>
181+
typename std::enable_if<
182+
(std::is_same<OriginT, MessageUniquePtr>::value),
183+
MessageSharedPtr
184+
>::type
185+
consume_shared_impl()
186+
{
187+
// automatic cast from unique ptr to shared ptr
188+
return buffer_->dequeue();
189+
}
190+
191+
// MessageSharedPtr to MessageUniquePtr
192+
template<typename OriginT>
193+
typename std::enable_if<
194+
(std::is_same<OriginT, MessageSharedPtr>::value),
195+
MessageUniquePtr
196+
>::type
197+
consume_unique_impl()
198+
{
199+
MessageSharedPtr buffer_msg = buffer_->dequeue();
200+
201+
MessageUniquePtr unique_msg;
202+
MessageDeleter * deleter = std::get_deleter<MessageDeleter, const MessageT>(buffer_msg);
203+
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
204+
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *buffer_msg);
205+
if (deleter) {
206+
unique_msg = MessageUniquePtr(ptr, *deleter);
207+
} else {
208+
unique_msg = MessageUniquePtr(ptr);
209+
}
210+
211+
return unique_msg;
212+
}
213+
214+
// MessageUniquePtr to MessageUniquePtr
215+
template<typename OriginT>
216+
typename std::enable_if<
217+
(std::is_same<OriginT, MessageUniquePtr>::value),
218+
MessageUniquePtr
219+
>::type
220+
consume_unique_impl()
221+
{
222+
return buffer_->dequeue();
223+
}
224+
};
225+
226+
} // namespace intra_process_buffer
227+
} // namespace rclcpp
228+
229+
230+
#endif // RCLCPP__BUFFERS__INTRA_PROCESS_BUFFER_HPP_

0 commit comments

Comments
 (0)