generated from icsl-Jeon/simple-ros2-package
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmulti_thread_mutually_exclusive_subscriber_node.cc
138 lines (117 loc) · 4.46 KB
/
multi_thread_mutually_exclusive_subscriber_node.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include <chrono>
#include <random>
#include <thread>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
using namespace std::chrono_literals;
class MultiThreadMutuallyExclusiveSubscriber : public rclcpp::Node {
public:
MultiThreadMutuallyExclusiveSubscriber(rclcpp::NodeOptions node_options)
: Node("subscriber_node",
node_options.allow_undeclared_parameters(true)) {
rclcpp::SubscriptionOptions options;
options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
short_subscriber_ = create_subscription<std_msgs::msg::String>(
"/short_topic", rclcpp::QoS(10),
std::bind(&MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback,
this, std::placeholders::_1),
options);
options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
long_subscriber_ = create_subscription<std_msgs::msg::String>(
"/long_topic", rclcpp::QoS(10),
std::bind(&MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback,
this, std::placeholders::_1),
options);
timer_ = create_wall_timer(
1s, std::bind(&MultiThreadMutuallyExclusiveSubscriber::TimerCallback,
this));
auto param_change_callback =
[this](std::vector<rclcpp::Parameter> parameters) {
for (auto parameter : parameters) {
if (parameter.get_name() == "use_mutex") {
use_mutex = parameter.as_bool();
if (use_mutex)
RCLCPP_INFO(get_logger(), "will use mutex");
else
RCLCPP_INFO(get_logger(), "will not mutex");
}
}
auto result = rcl_interfaces::msg::SetParametersResult();
result.successful = true;
return result;
};
callback_handler =
this->add_on_set_parameters_callback(param_change_callback);
}
private:
bool use_mutex{false};
struct {
std::mutex short_topic_mutex;
std::mutex long_topic_mutex;
} mutex_list;
std::string processed_short_string_;
std::string processed_long_string_;
void ProcessString(const std::string &raw_string, std::string &output);
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr short_subscriber_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr long_subscriber_;
rclcpp::TimerBase::SharedPtr timer_;
OnSetParametersCallbackHandle::SharedPtr callback_handler;
void ShortTopicCallback(const std_msgs::msg::String::SharedPtr msg);
void LongTopicCallback(const std_msgs::msg::String::SharedPtr msg);
void TimerCallback();
};
void MultiThreadMutuallyExclusiveSubscriber::ProcessString(
const std::string &raw_string, std::string &output) {
output = "** ";
for (char c : raw_string) {
output.push_back(c);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
output += " **";
};
void MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {
std::shared_ptr<std::unique_lock<std::mutex>> lock;
if (use_mutex)
mutex_list.short_topic_mutex.lock();
ProcessString(msg->data, processed_short_string_);
RCLCPP_INFO(get_logger(), "Setting processed: %s",
processed_short_string_.c_str());
if (use_mutex)
mutex_list.short_topic_mutex.unlock();
}
void MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {
if (use_mutex)
mutex_list.long_topic_mutex.lock();
ProcessString(msg->data, processed_long_string_);
RCLCPP_INFO(get_logger(), "Setting processed: %s",
processed_long_string_.c_str());
if (use_mutex)
mutex_list.long_topic_mutex.unlock();
}
void MultiThreadMutuallyExclusiveSubscriber::TimerCallback() {
if (use_mutex) {
mutex_list.short_topic_mutex.lock();
mutex_list.long_topic_mutex.lock();
}
RCLCPP_INFO(get_logger(),
"Getting processed strings: \n [long] %s \n [short] %s",
processed_long_string_.c_str(), processed_short_string_.c_str());
if (use_mutex) {
mutex_list.short_topic_mutex.unlock();
mutex_list.long_topic_mutex.unlock();
}
}
int main(int argc, char **argv) {
rclcpp::init(argc, argv);
rclcpp::NodeOptions option;
auto node = std::make_shared<MultiThreadMutuallyExclusiveSubscriber>(option);
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
return 0;
}