Skip to content

Commit

Permalink
Fixup Executor::spin_all() regression fix (#2517)
Browse files Browse the repository at this point in the history
* test(Executors): Added tests for busy waiting

Checks if executors are busy waiting while they should block
in spin_some or spin_all.

Signed-off-by: Janosch Machowinski <[email protected]>

* fix: Reworked spinAll test

This test was strange. It looked like, it assumed that spin_all did
not return instantly. Also it was racy, as the thread could terminate
instantly.

Signed-off-by: Janosch Machowinski <[email protected]>

* fix(Executor): Fixed spin_all not returning instantly is no work was available

Signed-off-by: Janosch Machowinski <[email protected]>

* Update rclcpp/test/rclcpp/executors/test_executors.cpp

Co-authored-by: Tomoya Fujita <[email protected]>
Signed-off-by: jmachowinski <[email protected]>

* test(executors): Added test for busy waiting while calling spin

Signed-off-by: Janosch Machowinski <[email protected]>

* fix(executor): Reset wait_result on every call to spin_some_impl

Before, the method would not recollect available work in case of
spin_some, spin_all. This would lead to the method behaving differently
than to what the documentation states.

Signed-off-by: Janosch Machowinski <[email protected]>

* restore previous test logic for now

Signed-off-by: William Woodall <[email protected]>

* refactor spin_some_impl's logic and improve busy wait tests

Signed-off-by: William Woodall <[email protected]>

* added some more comments about the implementation

Signed-off-by: William Woodall <[email protected]>

---------

Signed-off-by: Janosch Machowinski <[email protected]>
Signed-off-by: jmachowinski <[email protected]>
Signed-off-by: William Woodall <[email protected]>
Co-authored-by: Janosch Machowinski <[email protected]>
Co-authored-by: jmachowinski <[email protected]>
Co-authored-by: Tomoya Fujita <[email protected]>
  • Loading branch information
4 people authored May 2, 2024
1 parent 5f912eb commit f7185dc
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 11 deletions.
50 changes: 39 additions & 11 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,24 +366,52 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );

// clear the wait result and wait for work without blocking to collect the work
// for the first time
// both spin_some and spin_all wait for work at the beginning
wait_result_.reset();
wait_for_work(std::chrono::milliseconds(0));
bool just_waited = true;

// The logic of this while loop is as follows:
//
// - while not shutdown, and spinning (not canceled), and not max duration reached...
// - try to get an executable item to execute, and execute it if available
// - otherwise, reset the wait result, and ...
// - if there was no work available just after waiting, break the loop unconditionally
// - this is appropriate for both spin_some and spin_all which use this function
// - else if exhaustive = true, then wait for work again
// - this is only used for spin_all and not spin_some
// - else break
// - this only occurs with spin_some
//
// The logic of this loop is subtle and should be carefully changed if at all.
// See also:
// https://github.com/ros2/rclcpp/issues/2508
// https://github.com/ros2/rclcpp/pull/2517
while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) {
if (!wait_result_.has_value()) {
wait_for_work(std::chrono::milliseconds(0));
}

AnyExecutable any_exec;
if (get_next_ready_executable(any_exec)) {
execute_any_executable(any_exec);
just_waited = false;
} else {
// If nothing is ready, reset the result to signal we are
// ready to wait again
// if nothing is ready, reset the result to clear it
wait_result_.reset();
}

if (!wait_result_.has_value() && !exhaustive) {
// In the case of spin some, then we can exit
// In the case of spin all, then we will allow ourselves to wait again.
break;
if (just_waited) {
// there was no work after just waiting, always exit in this case
// before the exhaustive condition can be checked
break;
}

if (exhaustive) {
// if exhaustive, wait for work again
// this only happens for spin_all; spin_some only waits at the start
wait_for_work(std::chrono::milliseconds(0));
just_waited = true;
} else {
break;
}
}
}
}
Expand Down
160 changes: 160 additions & 0 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ class TestWaitable : public rclcpp::Waitable
bool
is_ready(const rcl_wait_set_t & wait_set) override
{
is_ready_count_++;
for (size_t i = 0; i < wait_set.size_of_guard_conditions; ++i) {
auto rcl_guard_condition = wait_set.guard_conditions[i];
if (&gc_.get_rcl_guard_condition() == rcl_guard_condition) {
Expand Down Expand Up @@ -424,8 +425,15 @@ class TestWaitable : public rclcpp::Waitable
return count_;
}

size_t
get_is_ready_call_count() const
{
return is_ready_count_;
}

private:
std::atomic<size_t> trigger_count_ = 0;
std::atomic<size_t> is_ready_count_ = 0;
std::atomic<size_t> count_ = 0;
rclcpp::GuardCondition gc_;
std::function<void()> on_execute_callback_ = nullptr;
Expand Down Expand Up @@ -869,3 +877,155 @@ TEST(TestExecutors, testSpinWithNonDefaultContext)

rclcpp::shutdown(non_default_context);
}

template<typename T>
class TestBusyWaiting : public ::testing::Test
{
public:
void SetUp() override
{
rclcpp::init(0, nullptr);

const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
std::stringstream test_name;
test_name << test_info->test_case_name() << "_" << test_info->name();
node = std::make_shared<rclcpp::Node>("node", test_name.str());
callback_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive,
/* automatically_add_to_executor_with_node =*/ false);

auto waitable_interfaces = node->get_node_waitables_interface();
waitable = std::make_shared<TestWaitable>();
waitable_interfaces->add_waitable(waitable, callback_group);

executor = std::make_shared<T>();
executor->add_callback_group(callback_group, node->get_node_base_interface());
}

void TearDown() override
{
rclcpp::shutdown();
}

void
set_up_and_trigger_waitable(std::function<void()> extra_callback = nullptr)
{
this->has_executed = false;
this->waitable->set_on_execute_callback([this, extra_callback]() {
if (!this->has_executed) {
// trigger once to see if the second trigger is handled or not
// this follow up trigger simulates new entities becoming ready while
// the executor is executing something else, e.g. subscription got data
// or a timer expired, etc.
// spin_some would not handle this second trigger, since it collects
// work only once, whereas spin_all should handle it since it
// collects work multiple times
this->waitable->trigger();
this->has_executed = true;
}
if (nullptr != extra_callback) {
extra_callback();
}
});
this->waitable->trigger();
}

void
check_for_busy_waits(std::chrono::steady_clock::time_point start_time)
{
// rough time based check, since the work to be done was very small it
// should be safe to check that we didn't use more than half the
// max duration, which itself is much larger than necessary
// however, it could still produce a false-positive
EXPECT_LT(
std::chrono::steady_clock::now() - start_time,
max_duration / 2)
<< "executor took a long time to execute when it should have done "
<< "nothing and should not have blocked either, but this could be a "
<< "false negative if the computer is really slow";

// this check is making some assumptions about the implementation of the
// executors, but it should be safe to say that a busy wait may result in
// hundreds or thousands of calls to is_ready(), but "normal" executor
// behavior should be within an order of magnitude of the number of
// times that the waitable was executed
ASSERT_LT(waitable->get_is_ready_call_count(), 10u * this->waitable->get_count());
}

static constexpr auto max_duration = 10s;

rclcpp::Node::SharedPtr node;
rclcpp::CallbackGroup::SharedPtr callback_group;
std::shared_ptr<TestWaitable> waitable;
std::chrono::steady_clock::time_point start_time;
std::shared_ptr<T> executor;
bool has_executed;
};

TYPED_TEST_SUITE(TestBusyWaiting, ExecutorTypes, ExecutorTypeNames);

TYPED_TEST(TestBusyWaiting, test_spin_all)
{
this->set_up_and_trigger_waitable();

auto start_time = std::chrono::steady_clock::now();
this->executor->spin_all(this->max_duration);
this->check_for_busy_waits(start_time);
// this should get the initial trigger, and the follow up from in the callback
ASSERT_EQ(this->waitable->get_count(), 2u);
}

TYPED_TEST(TestBusyWaiting, test_spin_some)
{
this->set_up_and_trigger_waitable();

auto start_time = std::chrono::steady_clock::now();
this->executor->spin_some(this->max_duration);
this->check_for_busy_waits(start_time);
// this should get the inital trigger, but not the follow up in the callback
ASSERT_EQ(this->waitable->get_count(), 1u);
}

TYPED_TEST(TestBusyWaiting, test_spin)
{
std::condition_variable cv;
std::mutex cv_m;
bool first_check_passed = false;

this->set_up_and_trigger_waitable([&cv, &cv_m, &first_check_passed]() {
cv.notify_one();
if (!first_check_passed) {
std::unique_lock<std::mutex> lk(cv_m);
cv.wait_for(lk, 1s, [&]() {return first_check_passed;});
}
});

auto start_time = std::chrono::steady_clock::now();
std::thread t([this]() {
this->executor->spin();
});

// wait until thread has started (first execute of waitable)
{
std::unique_lock<std::mutex> lk(cv_m);
cv.wait_for(lk, 10s);
}
EXPECT_GT(this->waitable->get_count(), 0u);

first_check_passed = true;
cv.notify_one();

// wait until the executor has finished (second execute of waitable)
{
std::unique_lock<std::mutex> lk(cv_m);
cv.wait_for(lk, 10s);
}
EXPECT_EQ(this->waitable->get_count(), 2u);

this->executor->cancel();
t.join();

this->check_for_busy_waits(start_time);
// this should get the initial trigger, and the follow up from in the callback
ASSERT_EQ(this->waitable->get_count(), 2u);
}

0 comments on commit f7185dc

Please sign in to comment.