Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "Add support for spin_until_timeout (#1821)"" #1957

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,11 @@ class Client : public ClientBase
/// Send a request to the service server.
/**
* This method returns a `FutureAndRequestId` instance
* that can be passed to Executor::spin_until_future_complete() to
* that can be passed to Executor::spin_until_complete() to
* wait until it has been completed.
*
* If the future never completes,
* e.g. the call to Executor::spin_until_future_complete() times out,
* e.g. the call to Executor::spin_until_complete() times out,
* Client::remove_pending_request() must be called to clean the client internal state.
* Not doing so will make the `Client` instance to use more memory each time a response is not
* received from the service server.
Expand All @@ -592,7 +592,7 @@ class Client : public ClientBase
* auto future = client->async_send_request(my_request);
* if (
* rclcpp::FutureReturnCode::TIMEOUT ==
* executor->spin_until_future_complete(future, timeout))
* executor->spin_until_complete(future, timeout))
* {
* client->remove_pending_request(future);
* // handle timeout
Expand Down
143 changes: 97 additions & 46 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <mutex>
#include <string>
#include <vector>
#include <type_traits>
fujitatomoya marked this conversation as resolved.
Show resolved Hide resolved

#include "rcl/guard_condition.h"
#include "rcl/wait.h"
Expand Down Expand Up @@ -319,6 +320,51 @@ class Executor
virtual void
spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// Spin (blocking) until the condition is complete, it times out waiting,
/// or rclcpp is interrupted.
/**
* \param[in] future The condition which can be callable or future type to wait on.
* If this function returns SUCCESS, the future can be
* accessed without blocking (though it may still throw an exception).
* \param[in] timeout Optional timeout parameter, which gets passed to Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return
* code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename ConditionT, typename DurationT = std::chrono::milliseconds>
FutureReturnCode
spin_until_complete(
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
if constexpr (std::is_invocable_v<ConditionT>) {
using RetT = std::invoke_result_t<ConditionT>;
static_assert(
std::is_same_v<bool, RetT>,
"Conditional callable has to return boolean type");
return spin_until_complete_impl(condition, timeout);
} else {
auto check_future = [&condition]() {
return condition.wait_for(std::chrono::seconds(0)) ==
std::future_status::ready;
};
return spin_until_complete_impl(check_future, timeout);
}
}

/// Spin (blocking) for at least the given amount of duration.
/**
* \param[in] duration gets passed to Executor::spin_node_once,
* spins the executor for given duration.
*/
template<typename DurationT>
void
spin_for(DurationT duration)
{
(void)spin_until_complete([]() {return false;}, duration);
}

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] future The future to wait on. If this function returns SUCCESS, the future can be
Expand All @@ -330,57 +376,13 @@ class Executor
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
[[deprecated("use spin_until_complete(const ConditionT & condition, DurationT timeout) instead")]]
FutureReturnCode
spin_until_future_complete(
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.

// Check the future before entering the while loop.
// If the future is already complete, don't try to spin.
std::future_status status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}

auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

// Check if the future is set, return SUCCESS if it is.
status = future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The future did not complete before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
return spin_until_complete(future, timeout);
}

/// Cancel any running spin* function, causing it to return.
Expand Down Expand Up @@ -560,6 +562,55 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

protected:
// Implementation details, used by spin_until_complete and spin_for.
// Previouse implementation of spin_until_future_complete.
template<typename ConditionT, typename DurationT>
FutureReturnCode
spin_until_complete_impl(ConditionT condition, DurationT timeout)
{
auto end_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
std::chrono::nanoseconds timeout_left = timeout_ns;

// Preliminary check, finish if conditon is done already.
if (condition()) {
return FutureReturnCode::SUCCESS;
}

if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);

if (condition()) {
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
auto now = std::chrono::steady_clock::now();
if (now >= end_time) {
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}

// The condition did not pass before ok() returned false, return INTERRUPTED.
return FutureReturnCode::INTERRUPTED;
}

public:
typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
Expand Down
95 changes: 86 additions & 9 deletions rclcpp/include/rclcpp/executors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,50 @@ namespace executors
using rclcpp::executors::MultiThreadedExecutor;
using rclcpp::executors::SingleThreadedExecutor;

/// Spin (blocking) until the conditon is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] executor The executor which will spin the node.
* \param[in] node_ptr The node to spin.
* \param[in] condition The callable or future to wait on. If `SUCCESS`, the condition is safe to
* access after this function
* \param[in] timeout Optional timeout parameter, which gets passed to
* Executor::spin_node_once.
* `-1` is block forever, `0` is non-blocking.
* If the time spent inside the blocking loop exceeds this timeout, return a `TIMEOUT` return code.
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename ConditionT, typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_node_until_complete(
rclcpp::Executor & executor,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_complete
// inside a callback executed by an executor.
executor.add_node(node_ptr);
auto retcode = executor.spin_until_complete(condition, timeout);
executor.remove_node(node_ptr);
return retcode;
}

template<typename NodeT = rclcpp::Node, typename ConditionT,
typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_node_until_complete(
rclcpp::Executor & executor,
std::shared_ptr<NodeT> node_ptr,
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
return rclcpp::executors::spin_node_until_complete(
executor,
node_ptr->get_node_base_interface(),
condition,
timeout);
}

/// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
/**
* \param[in] executor The executor which will spin the node.
Expand All @@ -67,31 +111,34 @@ using rclcpp::executors::SingleThreadedExecutor;
* \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`.
*/
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
[[deprecated(
"use spin_node_until_complete(Executor &, node_interfaces::NodeBaseInterface::SharedPtr, "
"const ConditionT &, DurationT) instead"
)]]
rclcpp::FutureReturnCode
spin_node_until_future_complete(
rclcpp::Executor & executor,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
// inside a callback executed by an executor.
executor.add_node(node_ptr);
auto retcode = executor.spin_until_future_complete(future, timeout);
executor.remove_node(node_ptr);
return retcode;
return spin_until_complete(executor, node_ptr, future, timeout);
}

template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
[[deprecated(
"use spin_node_until_complete(Executor &, std::shared_ptr<NodeT>, "
"const ConditionT &, DurationT) instead"
)]]
rclcpp::FutureReturnCode
spin_node_until_future_complete(
rclcpp::Executor & executor,
std::shared_ptr<NodeT> node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
return rclcpp::executors::spin_node_until_future_complete(
return rclcpp::executors::spin_node_until_complete(
executor,
node_ptr->get_node_base_interface(),
future,
Expand All @@ -100,26 +147,56 @@ spin_node_until_future_complete(

} // namespace executors

template<typename ConditionT, typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_until_complete(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
rclcpp::executors::SingleThreadedExecutor executor;
return executors::spin_node_until_complete<ConditionT>(executor, node_ptr, condition, timeout);
}

template<typename NodeT = rclcpp::Node, typename ConditionT,
typename DurationT = std::chrono::milliseconds>
rclcpp::FutureReturnCode
spin_until_complete(
std::shared_ptr<NodeT> node_ptr,
const ConditionT & condition,
DurationT timeout = DurationT(-1))
{
return rclcpp::spin_until_complete(node_ptr->get_node_base_interface(), condition, timeout);
}

template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
[[deprecated(
"use spin_until_complete(node_interfaces::NodeBaseInterface::SharedPtr, "
"const ConditionT &,DurationT) instead"
)]]
rclcpp::FutureReturnCode
spin_until_future_complete(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
rclcpp::executors::SingleThreadedExecutor executor;
return executors::spin_node_until_future_complete<FutureT>(executor, node_ptr, future, timeout);
return executors::spin_node_until_complete<FutureT>(executor, node_ptr, future, timeout);
}

template<typename NodeT = rclcpp::Node, typename FutureT, typename TimeRepT = int64_t,
typename TimeT = std::milli>
[[deprecated(
"use spin_until_complete(std::shared_ptr<NodeT>, const ConditionT &, "
"DurationT) instead"
)]]
rclcpp::FutureReturnCode
spin_until_future_complete(
std::shared_ptr<NodeT> node_ptr,
const FutureT & future,
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
{
return rclcpp::spin_until_future_complete(node_ptr->get_node_base_interface(), future, timeout);
return rclcpp::spin_until_complete(node_ptr->get_node_base_interface(), future, timeout);
}

} // namespace rclcpp
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/future_return_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
namespace rclcpp
{

/// Return codes to be used with spin_until_future_complete.
/// Return codes to be used with spin_until_complete.
/**
* SUCCESS: The future is complete and can be accessed with "get" without blocking.
* This does not indicate that the operation succeeded; "get" may still throw an exception.
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/rclcpp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
* - Executors (responsible for execution of callbacks through a blocking spin):
* - rclcpp::spin()
* - rclcpp::spin_some()
* - rclcpp::spin_until_future_complete()
* - rclcpp::spin_until_complete()
* - rclcpp::executors::SingleThreadedExecutor
* - rclcpp::executors::SingleThreadedExecutor::add_node()
* - rclcpp::executors::SingleThreadedExecutor::spin()
Expand Down
Loading