Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 137 additions & 69 deletions rclcpp_action/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class ServerBaseImpl
{
}

// Lock everything except user callbacks
std::recursive_mutex reentrant_mutex_;
// Lock for action_server_
std::recursive_mutex action_server_reentrant_mutex_;

std::shared_ptr<rcl_action_server_t> action_server_;

Expand All @@ -56,10 +56,13 @@ class ServerBaseImpl
size_t num_services_ = 0;
size_t num_guard_conditions_ = 0;

bool goal_request_ready_ = false;
bool cancel_request_ready_ = false;
bool result_request_ready_ = false;
bool goal_expired_ = false;
std::atomic<bool> goal_request_ready_{false};
std::atomic<bool> cancel_request_ready_{false};
std::atomic<bool> result_request_ready_{false};
std::atomic<bool> goal_expired_{false};

// Lock for unordered_maps
std::recursive_mutex unordered_map_mutex_;

// Results to be kept until the goal expires after reaching a terminal state
std::unordered_map<GoalUUID, std::shared_ptr<void>> goal_results_;
Expand Down Expand Up @@ -88,12 +91,13 @@ ServerBase::ServerBase(
if (nullptr != ptr) {
rcl_node_t * rcl_node = node_base->get_rcl_node_handle();
rcl_ret_t ret = rcl_action_server_fini(ptr, rcl_node);
(void)ret;
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_server_t in deleter");
if (RCL_RET_OK != ret) {
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_server_t in deleter");
}
delete ptr;
}
delete ptr;
};

pimpl_->action_server_.reset(new rcl_action_server_t, deleter);
Expand Down Expand Up @@ -159,7 +163,7 @@ ServerBase::get_number_of_ready_guard_conditions()
bool
ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_wait_set_add_action_server(
wait_set, pimpl_->action_server_.get(), NULL);
return RCL_RET_OK == ret;
Expand All @@ -168,35 +172,47 @@ ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set)
bool
ServerBase::is_ready(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
rcl_ret_t ret = rcl_action_server_wait_set_get_entities_ready(
wait_set,
pimpl_->action_server_.get(),
&pimpl_->goal_request_ready_,
&pimpl_->cancel_request_ready_,
&pimpl_->result_request_ready_,
&pimpl_->goal_expired_);
bool goal_request_ready;
bool cancel_request_ready;
bool result_request_ready;
bool goal_expired;
rcl_ret_t ret;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_server_wait_set_get_entities_ready(
wait_set,
pimpl_->action_server_.get(),
&goal_request_ready,
&cancel_request_ready,
&result_request_ready,
&goal_expired);
}

pimpl_->goal_request_ready_ = goal_request_ready;
pimpl_->cancel_request_ready_ = cancel_request_ready;
pimpl_->result_request_ready_ = result_request_ready;
pimpl_->goal_expired_ = goal_expired;

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}

return pimpl_->goal_request_ready_ ||
pimpl_->cancel_request_ready_ ||
pimpl_->result_request_ready_ ||
pimpl_->goal_expired_;
return pimpl_->goal_request_ready_.load() ||
pimpl_->cancel_request_ready_.load() ||
pimpl_->result_request_ready_.load() ||
pimpl_->goal_expired_.load();
}

void
ServerBase::execute()
{
if (pimpl_->goal_request_ready_) {
if (pimpl_->goal_request_ready_.load()) {
execute_goal_request_received();
} else if (pimpl_->cancel_request_ready_) {
} else if (pimpl_->cancel_request_ready_.load()) {
execute_cancel_request_received();
} else if (pimpl_->result_request_ready_) {
} else if (pimpl_->result_request_ready_.load()) {
execute_result_request_received();
} else if (pimpl_->goal_expired_) {
} else if (pimpl_->goal_expired_.load()) {
execute_check_expired_goals();
} else {
throw std::runtime_error("Executing action server but nothing is ready");
Expand All @@ -210,15 +226,19 @@ ServerBase::execute_goal_request_received()
rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info();
rmw_request_id_t request_header;

std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);

std::shared_ptr<void> message = create_goal_request();
ret = rcl_action_take_goal_request(
pimpl_->action_server_.get(),
&request_header,
message.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_goal_request(
pimpl_->action_server_.get(),
&request_header,
message.get());
}

pimpl_->goal_request_ready_ = false;
bool expected = true;
if (!pimpl_->goal_request_ready_.compare_exchange_strong(expected, false)) {
return;
}

if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
Expand All @@ -235,10 +255,13 @@ ServerBase::execute_goal_request_received()
// Call user's callback, getting the user's response and a ros message to send back
auto response_pair = call_handle_goal_callback(uuid, message);

ret = rcl_action_send_goal_response(
pimpl_->action_server_.get(),
&request_header,
response_pair.second.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_goal_response(
pimpl_->action_server_.get(),
&request_header,
response_pair.second.get());
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
Expand All @@ -254,15 +277,19 @@ ServerBase::execute_goal_request_received()
{
if (nullptr != ptr) {
rcl_ret_t fail_ret = rcl_action_goal_handle_fini(ptr);
(void)fail_ret;
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_goal_handle_t in deleter");
if (RCL_RET_OK != fail_ret) {
RCLCPP_DEBUG(
rclcpp::get_logger("rclcpp_action"),
"failed to fini rcl_action_goal_handle_t in deleter");
}
delete ptr;
}
};
rcl_action_goal_handle_t * rcl_handle;
rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info);
}
if (!rcl_handle) {
throw std::runtime_error("Failed to accept new goal\n");
}
Expand All @@ -271,7 +298,11 @@ ServerBase::execute_goal_request_received()
// Copy out goal handle since action server storage disappears when it is fini'd
*handle = *rcl_handle;

pimpl_->goal_handles_[uuid] = handle;

{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_handles_[uuid] = handle;
}

if (GoalResponse::ACCEPT_AND_EXECUTE == status) {
// Change status to executing
Expand All @@ -297,13 +328,18 @@ ServerBase::execute_cancel_request_received()
// Initialize cancel request
auto request = std::make_shared<action_msgs::srv::CancelGoal::Request>();

std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
ret = rcl_action_take_cancel_request(
pimpl_->action_server_.get(),
&request_header,
request.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_cancel_request(
pimpl_->action_server_.get(),
&request_header,
request.get());
}

pimpl_->cancel_request_ready_ = false;
bool expected = true;
if (!pimpl_->cancel_request_ready_.compare_exchange_strong(expected, false)) {
return;
}

if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
Expand All @@ -323,10 +359,14 @@ ServerBase::execute_cancel_request_received()
// Get a list of goal info that should be attempted to be cancelled
rcl_action_cancel_response_t cancel_response = rcl_action_get_zero_initialized_cancel_response();

ret = rcl_action_process_cancel_request(
pimpl_->action_server_.get(),
&cancel_request,
&cancel_response);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_process_cancel_request(
pimpl_->action_server_.get(),
&cancel_request,
&cancel_response);
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
Expand Down Expand Up @@ -369,8 +409,12 @@ ServerBase::execute_cancel_request_received()
publish_status();
}

ret = rcl_action_send_cancel_response(
pimpl_->action_server_.get(), &request_header, response.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_cancel_response(
pimpl_->action_server_.get(), &request_header, response.get());
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
Expand All @@ -383,11 +427,16 @@ ServerBase::execute_result_request_received()
// Get the result request message
rmw_request_id_t request_header;
std::shared_ptr<void> result_request = create_result_request();
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
ret = rcl_action_take_result_request(
pimpl_->action_server_.get(), &request_header, result_request.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_take_result_request(
pimpl_->action_server_.get(), &request_header, result_request.get());
}

pimpl_->result_request_ready_ = false;
bool expected = true;
if (!pimpl_->result_request_ready_.compare_exchange_strong(expected, false)) {
return;
}

if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
Expand All @@ -405,7 +454,10 @@ ServerBase::execute_result_request_received()
rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info);
bool goal_exists;
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}
if (!goal_exists) {
// Goal does not exists
result_response = create_result_response(action_msgs::msg::GoalStatus::STATUS_UNKNOWN);
Expand All @@ -419,13 +471,15 @@ ServerBase::execute_result_request_received()

if (result_response) {
// Send the result now
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_response.get());
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
} else {
// Store the request so it can be responded to later
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->result_requests_[uuid].push_back(request_header);
}
}
Expand All @@ -439,16 +493,19 @@ ServerBase::execute_check_expired_goals()

// Loop in case more than 1 goal expired
while (num_expired > 0u) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
rcl_ret_t ret;
ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired);
}
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
} else if (num_expired) {
// A goal expired!
GoalUUID uuid;
convert(expired_goals[0], &uuid);
RCLCPP_DEBUG(pimpl_->logger_, "Expired goal %s", to_string(uuid).c_str());
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_.erase(uuid);
pimpl_->result_requests_.erase(uuid);
pimpl_->goal_handles_.erase(uuid);
Expand All @@ -461,6 +518,11 @@ ServerBase::publish_status()
{
rcl_ret_t ret;

// We need to hold the lock across this entire method because
// rcl_action_server_get_goal_handles() returns an internal pointer to the
// goal data.
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);

// Get all goal handles known to C action server
rcl_action_goal_handle_t ** goal_handles = NULL;
size_t num_goals = 0;
Expand Down Expand Up @@ -516,20 +578,26 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_m
// Check that the goal exists
rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info);
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
bool goal_exists;
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}

if (!goal_exists) {
throw std::runtime_error("Asked to publish result for goal that does not exist");
}

pimpl_->goal_results_[uuid] = result_msg;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_[uuid] = result_msg;
}

// if there are clients who already asked for the result, send it to them
auto iter = pimpl_->result_requests_.find(uuid);
if (iter != pimpl_->result_requests_.end()) {
for (auto & request_header : iter->second) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_msg.get());
if (RCL_RET_OK != ret) {
Expand All @@ -542,7 +610,7 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_m
void
ServerBase::notify_goal_terminal_state()
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_notify_goal_done(pimpl_->action_server_.get());
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
Expand All @@ -552,7 +620,7 @@ ServerBase::notify_goal_terminal_state()
void
ServerBase::publish_feedback(std::shared_ptr<void> feedback_msg)
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->reentrant_mutex_);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_publish_feedback(pimpl_->action_server_.get(), feedback_msg.get());
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to publish feedback");
Expand Down