Skip to content

Commit

Permalink
Reduce overhead for inheriting from rclcpp::Executor when base functi…
Browse files Browse the repository at this point in the history
…onality is not reused (#2506)

* feat(clock): Added function to interrupt sleep

This is useful in case a second thread needs to wake up another
thread, that is sleeping using a clock.

Signed-off-by: Janosch Machowinski <[email protected]>

* feat: Added support for multi thread wait / shutdown

This adds support for multiple threads waiting on the same
clock, while an shutdown is invoked.

Signed-off-by: Janosch Machowinski <[email protected]>

* chore: Made Executor fully overloadable

This commit makes every public funciton virtual, and adds virtual impl
function for the existing template functions.

The goal of this commit is to be able to fully control the everything from
a derived class.

Signed-off-by: Janosch Machowinski <[email protected]>

* Update rclcpp/include/rclcpp/executor.hpp

Co-authored-by: William Woodall <[email protected]>
Signed-off-by: jmachowinski <[email protected]>

* Update rclcpp/include/rclcpp/executor.hpp

Co-authored-by: William Woodall <[email protected]>
Signed-off-by: jmachowinski <[email protected]>

* Update rclcpp/include/rclcpp/executor.hpp

Co-authored-by: William Woodall <[email protected]>
Signed-off-by: jmachowinski <[email protected]>

* Update rclcpp/include/rclcpp/executor.hpp

Co-authored-by: William Woodall <[email protected]>
Signed-off-by: jmachowinski <[email protected]>

* docs: added doc string for spin_until_future_complete_impl

Signed-off-by: Janosch Machowinski <[email protected]>

* made is_spinning not virtual

Signed-off-by: Janosch Machowinski <[email protected]>

* feat: Changed interface of spin_until_future_complete_impl

This change allows it to use a second thread to wait for
the future to become ready.

Signed-off-by: Janosch Machowinski <[email protected]>

* doc fixup

Signed-off-by: William Woodall <[email protected]>

* undo template->implicit conversion change

Signed-off-by: William Woodall <[email protected]>

* style

Signed-off-by: William Woodall <[email protected]>

---------

Signed-off-by: Janosch Machowinski <[email protected]>
Signed-off-by: Janosch Machowinski <[email protected]>
Signed-off-by: jmachowinski <[email protected]>
Signed-off-by: William Woodall <[email protected]>
Co-authored-by: Janosch Machowinski <[email protected]>
Co-authored-by: jmachowinski <[email protected]>
  • Loading branch information
3 people authored Apr 16, 2024
1 parent 535d56f commit 6bb9407
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 73 deletions.
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ class Clock
bool
ros_time_is_active();

/**
* Cancels an ongoing or future sleep operation of one thread.
*
* This function can be used by one thread, to wakeup another thread that is
* blocked using any of the sleep_ or wait_ methods of this class.
*/
RCLCPP_PUBLIC
void
cancel_sleep_or_wait();

/// Return the rcl_clock_t clock handle
RCLCPP_PUBLIC
rcl_clock_t *
Expand Down
85 changes: 35 additions & 50 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ class Executor
* \param[in] node Shared pointer to the node to add.
*/
RCLCPP_PUBLIC
void
virtual void
spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);

/// Convenience function which takes Node and forwards NodeBaseInterface.
RCLCPP_PUBLIC
void
virtual void
spin_node_some(std::shared_ptr<rclcpp::Node> node);

/// Collect work once and execute all available work, optionally within a max duration.
Expand Down Expand Up @@ -307,14 +307,14 @@ class Executor
* \param[in] node Shared pointer to the node to add.
*/
RCLCPP_PUBLIC
void
virtual void
spin_node_all(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::nanoseconds max_duration);

/// Convenience function which takes Node and forwards NodeBaseInterface.
RCLCPP_PUBLIC
void
virtual void
spin_node_all(std::shared_ptr<rclcpp::Node> node, std::chrono::nanoseconds max_duration);

/// Collect and execute work repeatedly within a duration or until no more work is available.
Expand Down Expand Up @@ -366,52 +366,12 @@ class Executor
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
return spin_until_future_complete_impl(
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout),
[&future](std::chrono::nanoseconds wait_time) {
return future.wait_for(wait_time);
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
);
}

/// Cancel any running spin* function, causing it to return.
Expand All @@ -420,7 +380,7 @@ class Executor
* \throws std::runtime_error if there is an issue triggering the guard condition
*/
RCLCPP_PUBLIC
void
virtual void
cancel();

/// Returns true if the executor is currently spinning.
Expand All @@ -433,6 +393,14 @@ class Executor
is_spinning();

protected:
/// Constructor that will not initialize any non-trivial members.
/**
* This constructor is intended to be used by any derived executor
* that explicitly does not want to use the default implementation provided
* by this class.
*/
explicit Executor(const std::shared_ptr<rclcpp::Context> & context);

/// Add a node to executor, execute the next available unit of work, and remove the node.
/**
* Implementation of spin_node_once using std::chrono::nanoseconds
Expand All @@ -447,6 +415,23 @@ class Executor
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
std::chrono::nanoseconds timeout);

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \sa spin_until_future_complete()
* The only difference with spin_until_future_complete() is that the future's
* type is obscured through a std::function which lets you wait on it
* reguardless of type.
*
* \param[in] timeout see spin_until_future_complete() for details
* \param[in] wait_for_future function to wait on the future and get the
* status after waiting
*/
RCLCPP_PUBLIC
virtual FutureReturnCode
spin_until_future_complete_impl(
std::chrono::nanoseconds timeout,
const std::function<std::future_status(std::chrono::nanoseconds wait_time)> & wait_for_future);

/// Collect work and execute available work, optionally within a duration.
/**
* Implementation of spin_some and spin_all.
Expand Down
71 changes: 49 additions & 22 deletions rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class Clock::Impl

rcl_clock_t rcl_clock_;
rcl_allocator_t allocator_;
bool stop_sleeping_ = false;
bool shutdown_ = false;
std::condition_variable cv_;
std::mutex wait_mutex_;
std::mutex clock_mutex_;
};

Expand Down Expand Up @@ -79,8 +83,20 @@ Clock::now() const
return now;
}

void
Clock::cancel_sleep_or_wait()
{
{
std::unique_lock lock(impl_->wait_mutex_);
impl_->stop_sleeping_ = true;
}
impl_->cv_.notify_one();
}

bool
Clock::sleep_until(Time until, Context::SharedPtr context)
Clock::sleep_until(
Time until,
Context::SharedPtr context)
{
if (!context || !context->is_valid()) {
throw std::runtime_error("context cannot be slept with because it's invalid");
Expand All @@ -91,12 +107,14 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
}
bool time_source_changed = false;

std::condition_variable cv;

// Wake this thread if the context is shutdown
rclcpp::OnShutdownCallbackHandle shutdown_cb_handle = context->add_on_shutdown_callback(
[&cv]() {
cv.notify_one();
[this]() {
{
std::unique_lock lock(impl_->wait_mutex_);
impl_->shutdown_ = true;
}
impl_->cv_.notify_one();
});
// No longer need the shutdown callback when this function exits
auto callback_remover = rcpputils::scope_exit(
Expand All @@ -112,22 +130,24 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
const std::chrono::steady_clock::time_point chrono_until =
chrono_entry + std::chrono::nanoseconds(delta_t.nanoseconds());

// loop over spurious wakeups but notice shutdown
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid()) {
cv.wait_until(lock, chrono_until);
// loop over spurious wakeups but notice shutdown or stop of sleep
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
impl_->cv_.wait_until(lock, chrono_until);
}
impl_->stop_sleeping_ = false;
} else if (this_clock_type == RCL_SYSTEM_TIME) {
auto system_time = std::chrono::system_clock::time_point(
// Cast because system clock resolution is too big for nanoseconds on some systems
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(until.nanoseconds())));

// loop over spurious wakeups but notice shutdown
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid()) {
cv.wait_until(lock, system_time);
// loop over spurious wakeups but notice shutdown or stop of sleep
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid()) {
impl_->cv_.wait_until(lock, system_time);
}
impl_->stop_sleeping_ = false;
} else if (this_clock_type == RCL_ROS_TIME) {
// Install jump handler for any amount of time change, for two purposes:
// - if ROS time is active, check if time reached on each new clock sample
Expand All @@ -139,11 +159,12 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
threshold.min_forward.nanoseconds = 1;
auto clock_handler = create_jump_callback(
nullptr,
[&cv, &time_source_changed](const rcl_time_jump_t & jump) {
[this, &time_source_changed](const rcl_time_jump_t & jump) {
if (jump.clock_change != RCL_ROS_TIME_NO_CHANGE) {
std::lock_guard<std::mutex> lk(impl_->wait_mutex_);
time_source_changed = true;
}
cv.notify_one();
impl_->cv_.notify_one();
},
threshold);

Expand All @@ -153,19 +174,25 @@ Clock::sleep_until(Time until, Context::SharedPtr context)
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(until.nanoseconds())));

// loop over spurious wakeups but notice shutdown or time source change
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid() && !time_source_changed) {
cv.wait_until(lock, system_time);
// loop over spurious wakeups but notice shutdown, stop of sleep or time source change
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
!time_source_changed)
{
impl_->cv_.wait_until(lock, system_time);
}
impl_->stop_sleeping_ = false;
} else {
// RCL_ROS_TIME with ros_time_is_active.
// Just wait without "until" because installed
// jump callbacks wake the cv on every new sample.
std::unique_lock lock(impl_->clock_mutex_);
while (now() < until && context->is_valid() && !time_source_changed) {
cv.wait(lock);
std::unique_lock lock(impl_->wait_mutex_);
while (now() < until && !impl_->stop_sleeping_ && !impl_->shutdown_ && context->is_valid() &&
!time_source_changed)
{
impl_->cv_.wait(lock);
}
impl_->stop_sleeping_ = false;
}
}

Expand Down
64 changes: 63 additions & 1 deletion rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ static constexpr rclcpp::SubscriptionWaitSetMask kDefaultSubscriptionMask = {tru

class rclcpp::ExecutorImplementation {};

Executor::Executor(const std::shared_ptr<rclcpp::Context> & context)
: spinning(false),
entities_need_rebuild_(true),
collector_(nullptr),
wait_set_({}, {}, {}, {}, {}, {}, context)
{
}

Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
Expand Down Expand Up @@ -120,7 +128,8 @@ Executor::~Executor()
}
}

void Executor::trigger_entity_recollect(bool notify)
void
Executor::trigger_entity_recollect(bool notify)
{
this->entities_need_rebuild_.store(true);

Expand Down Expand Up @@ -240,6 +249,59 @@ Executor::spin_node_once_nanoseconds(
this->remove_node(node, false);
}

rclcpp::FutureReturnCode
Executor::spin_until_future_complete_impl(
std::chrono::nanoseconds timeout,
const std::function<std::future_status(std::chrono::nanoseconds wait_time)> & wait_for_future)
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = wait_for_future(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

// Check if the future is set, return SUCCESS if it is.
status = wait_for_future(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
}

void
Executor::spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
{
Expand Down
4 changes: 4 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ ament_add_gtest(test_client test_client.cpp)
if(TARGET test_client)
target_link_libraries(test_client ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS})
endif()
ament_add_gtest(test_clock test_clock.cpp)
if(TARGET test_clock)
target_link_libraries(test_clock ${PROJECT_NAME} mimick ${rcl_interfaces_TARGETS} ${test_msgs_TARGETS})
endif()
ament_add_gtest(test_copy_all_parameter_values test_copy_all_parameter_values.cpp)
if(TARGET test_copy_all_parameter_values)
target_link_libraries(test_copy_all_parameter_values ${PROJECT_NAME})
Expand Down
Loading

0 comments on commit 6bb9407

Please sign in to comment.