Skip to content

Commit d12ed36

Browse files
author
hsgwa
authored
Allow timers to keep up the intended rate in MultiThreadedExecutor #1516 (#1636)
Backports #1516 and follow-up fix #1628 Patched to keep ABI compatibility by using static class variables to store the mutex two priorities instances. Signed-off-by: hsgwa <[email protected]>
1 parent 791c23a commit d12ed36

File tree

5 files changed

+208
-4
lines changed

5 files changed

+208
-4
lines changed

rclcpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ set(${PROJECT_NAME}_SRCS
3939
src/rclcpp/clock.cpp
4040
src/rclcpp/context.cpp
4141
src/rclcpp/contexts/default_context.cpp
42+
src/rclcpp/detail/mutex_two_priorities.cpp
4243
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
4344
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
4445
src/rclcpp/detail/rmw_implementation_specific_subscription_payload.cpp
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2021 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_
16+
#define RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_
17+
18+
#include <condition_variable>
19+
#include <mutex>
20+
21+
namespace rclcpp
22+
{
23+
namespace detail
24+
{
25+
/// \internal A mutex that has two locking mechanism, one with higher priority than the other.
26+
/**
27+
* After the current mutex owner release the lock, a thread that used the high
28+
* priority mechanism will have priority over threads that used the low priority mechanism.
29+
*/
30+
class MutexTwoPriorities
31+
{
32+
public:
33+
class HighPriorityLockable
34+
{
35+
public:
36+
explicit HighPriorityLockable(MutexTwoPriorities & parent);
37+
38+
void lock();
39+
40+
void unlock();
41+
42+
private:
43+
MutexTwoPriorities & parent_;
44+
};
45+
46+
class LowPriorityLockable
47+
{
48+
public:
49+
explicit LowPriorityLockable(MutexTwoPriorities & parent);
50+
51+
void lock();
52+
53+
void unlock();
54+
55+
private:
56+
MutexTwoPriorities & parent_;
57+
};
58+
59+
HighPriorityLockable
60+
get_high_priority_lockable();
61+
62+
LowPriorityLockable
63+
get_low_priority_lockable();
64+
65+
private:
66+
std::condition_variable hp_cv_;
67+
std::condition_variable lp_cv_;
68+
std::mutex cv_mutex_;
69+
size_t hp_waiting_count_{0u};
70+
bool data_taken_{false};
71+
};
72+
73+
} // namespace detail
74+
} // namespace rclcpp
75+
76+
#endif // RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_

rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <thread>
2323
#include <unordered_map>
2424

25+
#include "rclcpp/detail/mutex_two_priorities.hpp"
2526
#include "rclcpp/executor.hpp"
2627
#include "rclcpp/macros.hpp"
2728
#include "rclcpp/memory_strategies.hpp"
@@ -81,12 +82,17 @@ class MultiThreadedExecutor : public rclcpp::Executor
8182
private:
8283
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
8384

84-
std::mutex wait_mutex_;
85+
std::mutex wait_mutex_; // Unused. Leave it for ABI compatibility.
8586
size_t number_of_threads_;
8687
bool yield_before_execute_;
8788
std::chrono::nanoseconds next_exec_timeout_;
8889

8990
std::set<TimerBase::SharedPtr> scheduled_timers_;
91+
static std::unordered_map<MultiThreadedExecutor *,
92+
std::shared_ptr<detail::MutexTwoPriorities>> wait_mutex_set_;
93+
static std::mutex shared_wait_mutex_;
94+
// These variables are declared as static variables for ABI-compatibiliity.
95+
// And they mimic member variables needed to backport from master.
9096
};
9197

9298
} // namespace executors
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2021 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "rclcpp/detail/mutex_two_priorities.hpp"
16+
17+
#include <mutex>
18+
19+
namespace rclcpp
20+
{
21+
namespace detail
22+
{
23+
24+
using LowPriorityLockable = MutexTwoPriorities::LowPriorityLockable;
25+
using HighPriorityLockable = MutexTwoPriorities::HighPriorityLockable;
26+
27+
HighPriorityLockable::HighPriorityLockable(MutexTwoPriorities & parent)
28+
: parent_(parent)
29+
{}
30+
31+
void
32+
HighPriorityLockable::lock()
33+
{
34+
std::unique_lock<std::mutex> guard{parent_.cv_mutex_};
35+
if (parent_.data_taken_) {
36+
++parent_.hp_waiting_count_;
37+
while (parent_.data_taken_) {
38+
parent_.hp_cv_.wait(guard);
39+
}
40+
--parent_.hp_waiting_count_;
41+
}
42+
parent_.data_taken_ = true;
43+
}
44+
45+
void
46+
HighPriorityLockable::unlock()
47+
{
48+
bool notify_lp{false};
49+
{
50+
std::lock_guard<std::mutex> guard{parent_.cv_mutex_};
51+
parent_.data_taken_ = false;
52+
notify_lp = 0u == parent_.hp_waiting_count_;
53+
}
54+
if (notify_lp) {
55+
parent_.lp_cv_.notify_one();
56+
} else {
57+
parent_.hp_cv_.notify_one();
58+
}
59+
}
60+
61+
LowPriorityLockable::LowPriorityLockable(MutexTwoPriorities & parent)
62+
: parent_(parent)
63+
{}
64+
65+
void
66+
LowPriorityLockable::lock()
67+
{
68+
std::unique_lock<std::mutex> guard{parent_.cv_mutex_};
69+
while (parent_.data_taken_ || parent_.hp_waiting_count_) {
70+
parent_.lp_cv_.wait(guard);
71+
}
72+
parent_.data_taken_ = true;
73+
}
74+
75+
void
76+
LowPriorityLockable::unlock()
77+
{
78+
bool notify_lp{false};
79+
{
80+
std::lock_guard<std::mutex> guard{parent_.cv_mutex_};
81+
parent_.data_taken_ = false;
82+
notify_lp = 0u == parent_.hp_waiting_count_;
83+
}
84+
if (notify_lp) {
85+
parent_.lp_cv_.notify_one();
86+
} else {
87+
parent_.hp_cv_.notify_one();
88+
}
89+
}
90+
91+
HighPriorityLockable
92+
MutexTwoPriorities::get_high_priority_lockable()
93+
{
94+
return HighPriorityLockable{*this};
95+
}
96+
97+
LowPriorityLockable
98+
MutexTwoPriorities::get_low_priority_lockable()
99+
{
100+
return LowPriorityLockable{*this};
101+
}
102+
103+
} // namespace detail
104+
} // namespace rclcpp

rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,19 @@
1717
#include <chrono>
1818
#include <functional>
1919
#include <memory>
20+
#include <unordered_map>
2021
#include <vector>
2122

2223
#include "rclcpp/utilities.hpp"
2324
#include "rclcpp/scope_exit.hpp"
2425

26+
using rclcpp::detail::MutexTwoPriorities;
2527
using rclcpp::executors::MultiThreadedExecutor;
2628

29+
std::unordered_map<MultiThreadedExecutor *, std::shared_ptr<MutexTwoPriorities>>
30+
MultiThreadedExecutor::wait_mutex_set_;
31+
std::mutex MultiThreadedExecutor::shared_wait_mutex_;
32+
2733
MultiThreadedExecutor::MultiThreadedExecutor(
2834
const rclcpp::ExecutorOptions & options,
2935
size_t number_of_threads,
@@ -33,6 +39,11 @@ MultiThreadedExecutor::MultiThreadedExecutor(
3339
yield_before_execute_(yield_before_execute),
3440
next_exec_timeout_(next_exec_timeout)
3541
{
42+
{
43+
std::lock_guard<std::mutex> wait_lock(
44+
MultiThreadedExecutor::shared_wait_mutex_);
45+
wait_mutex_set_[this] = std::make_shared<MutexTwoPriorities>();
46+
}
3647
number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency();
3748
if (number_of_threads_ == 0) {
3849
number_of_threads_ = 1;
@@ -51,7 +62,9 @@ MultiThreadedExecutor::spin()
5162
std::vector<std::thread> threads;
5263
size_t thread_id = 0;
5364
{
54-
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
65+
auto wait_mutex = MultiThreadedExecutor::wait_mutex_set_[this];
66+
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
67+
std::lock_guard<MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
5568
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
5669
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
5770
threads.emplace_back(func);
@@ -76,7 +89,9 @@ MultiThreadedExecutor::run(size_t)
7689
while (rclcpp::ok(this->context_) && spinning.load()) {
7790
rclcpp::AnyExecutable any_exec;
7891
{
79-
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
92+
auto wait_mutex = MultiThreadedExecutor::wait_mutex_set_[this];
93+
auto low_priority_wait_mutex = wait_mutex->get_low_priority_lockable();
94+
std::lock_guard<MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
8095
if (!rclcpp::ok(this->context_) || !spinning.load()) {
8196
return;
8297
}
@@ -103,7 +118,9 @@ MultiThreadedExecutor::run(size_t)
103118
execute_any_executable(any_exec);
104119

105120
if (any_exec.timer) {
106-
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
121+
auto wait_mutex = MultiThreadedExecutor::wait_mutex_set_[this];
122+
auto high_priority_wait_mutex = wait_mutex->get_high_priority_lockable();
123+
std::lock_guard<MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
107124
auto it = scheduled_timers_.find(any_exec.timer);
108125
if (it != scheduled_timers_.end()) {
109126
scheduled_timers_.erase(it);

0 commit comments

Comments
 (0)