Skip to content

Commit bfad43b

Browse files
committed
Revert "More intuitive approach ..."
This reverts commit 6883d9a. Signed-off-by: Ivan Santiago Paunovic <[email protected]>
1 parent d2f4a91 commit bfad43b

File tree

2 files changed

+63
-20
lines changed

2 files changed

+63
-20
lines changed

rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,66 @@ class MultiThreadedExecutor : public rclcpp::Executor
8181
private:
8282
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)
8383

84-
std::mutex wait_mutex_;
84+
/// \internal A mutex that has two locking mechanism, one with higher priority than the other.
85+
/**
86+
* After the current mutex owner release the lock, a thread that used the high
87+
* priority mechanism will have priority over threads that used the low priority mechanism.
88+
*/
89+
class MutexTwoPriorities {
90+
public:
91+
class HpMutex
92+
{
93+
public:
94+
HpMutex(MutexTwoPriorities & parent) : parent_(parent) {}
95+
96+
void lock() {
97+
parent_.data_.lock();
98+
}
99+
100+
void unlock() {
101+
parent_.data_.unlock();
102+
}
103+
private:
104+
MutexTwoPriorities & parent_;
105+
};
106+
107+
class LpMutex
108+
{
109+
public:
110+
LpMutex(MutexTwoPriorities & parent) : parent_(parent) {}
111+
112+
void lock() {
113+
// low_prio_.lock(); data_.lock();
114+
std::unique_lock<std::mutex> lpg{parent_.low_prio_};
115+
parent_.data_.lock();
116+
lpg.release();
117+
}
118+
119+
void unlock() {
120+
// data_.unlock(); low_prio_.unlock()
121+
std::lock_guard<std::mutex> lpg{parent_.low_prio_, std::adopt_lock};
122+
parent_.data_.unlock();
123+
}
124+
private:
125+
MutexTwoPriorities & parent_;
126+
};
127+
128+
HpMutex hp() {return HpMutex{*this};}
129+
LpMutex lp() {return LpMutex{*this};}
130+
131+
private:
132+
// Implementation detail: the whole idea here is that only one low priority thread can be
133+
// trying to take the data_ mutex, while all high priority threads are already waiting there.
134+
std::mutex low_prio_;
135+
std::mutex data_;
136+
};
137+
138+
MutexTwoPriorities wait_mutex_;
85139
size_t number_of_threads_;
86140
bool yield_before_execute_;
87141
std::chrono::nanoseconds next_exec_timeout_;
88142

89143
std::set<TimerBase::SharedPtr> scheduled_timers_;
90-
std::mutex scheduled_timers_mutex_;
91144
};
92145

93146
} // namespace executors

rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,16 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}
4444
void
4545
MultiThreadedExecutor::spin()
4646
{
47+
using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities;
4748
if (spinning.exchange(true)) {
4849
throw std::runtime_error("spin() called while already spinning");
4950
}
5051
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
5152
std::vector<std::thread> threads;
5253
size_t thread_id = 0;
5354
{
54-
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
55+
auto lp_wait_mutex = wait_mutex_.lp();
56+
std::lock_guard<MutexTwoPriorities::LpMutex> wait_lock(lp_wait_mutex);
5557
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
5658
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
5759
threads.emplace_back(func);
@@ -73,10 +75,12 @@ MultiThreadedExecutor::get_number_of_threads()
7375
void
7476
MultiThreadedExecutor::run(size_t)
7577
{
78+
using MutexTwoPriorities = rclcpp::executors::MultiThreadedExecutor::MutexTwoPriorities;
7679
while (rclcpp::ok(this->context_) && spinning.load()) {
7780
rclcpp::AnyExecutable any_exec;
7881
{
79-
std::lock_guard<std::mutex> wait_guard{wait_mutex_};
82+
auto lp_wait_mutex = wait_mutex_.lp();
83+
std::lock_guard<MutexTwoPriorities::LpMutex> wait_lock(lp_wait_mutex);
8084
if (!rclcpp::ok(this->context_) || !spinning.load()) {
8185
return;
8286
}
@@ -85,13 +89,6 @@ MultiThreadedExecutor::run(size_t)
8589
}
8690
if (any_exec.timer) {
8791
// Guard against multiple threads getting the same timer.
88-
89-
// THIS MUST BE DONE WITH BOTH THE WAIT_MUTEX AND SCHEDULED TIMERS MUTEX TAKEN!!
90-
// It might seem unnecessary, but you avoid the following race:
91-
// Thread A: get timer, context switch.
92-
// Thread B: get timer, insert scheduled timer, execute timer, remove scheduled timer.
93-
// Thread A: execute timer.
94-
std::lock_guard<std::mutex> scheduled_timers_guard{scheduled_timers_mutex_};
9592
if (scheduled_timers_.count(any_exec.timer) != 0) {
9693
// Make sure that any_exec's callback group is reset before
9794
// the lock is released.
@@ -110,15 +107,8 @@ MultiThreadedExecutor::run(size_t)
110107
execute_any_executable(any_exec);
111108

112109
if (any_exec.timer) {
113-
// DON'T DELETE THE scheduled_timers_mutex_ AND REPLACE IT WITH THE wait_mutex_ here.
114-
// Now, this mutex will only compete with ONE worker thread that's is trying to insert
115-
// a timer.
116-
// (and also, N other worker threads also trying to delete a timer).
117-
// If the wait_mutex_ is used here, this will compete with all the other worker threads!!!
118-
// If we're waiting too long too remove the scheduled timer, maybe we are discarding a timer
119-
// execution that we shouldn't!
120-
// Of course, this can still happen if the callback is too long ...
121-
std::lock_guard<std::mutex> scheduled_timers_guard{scheduled_timers_mutex_};
110+
auto hp_wait_mutex = wait_mutex_.hp();
111+
std::lock_guard<MutexTwoPriorities::HpMutex> wait_lock(hp_wait_mutex);
122112
auto it = scheduled_timers_.find(any_exec.timer);
123113
if (it != scheduled_timers_.end()) {
124114
scheduled_timers_.erase(it);

0 commit comments

Comments
 (0)