Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 94 additions & 50 deletions rclcpp_action/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ServerBaseImpl
{
}

// Lock everything except user callbacks
// Lock for action_server_
std::recursive_mutex action_server_reentrant_mutex_;

std::shared_ptr<rcl_action_server_t> action_server_;
Expand All @@ -58,10 +58,13 @@ class ServerBaseImpl
size_t num_services_ = 0;
size_t num_guard_conditions_ = 0;

bool goal_request_ready_ = false;
bool cancel_request_ready_ = false;
bool result_request_ready_ = false;
bool goal_expired_ = false;
std::atomic<bool> goal_request_ready_{false};
std::atomic<bool> cancel_request_ready_{false};
std::atomic<bool> result_request_ready_{false};
std::atomic<bool> goal_expired_{false};

// Lock for unordered_maps
std::recursive_mutex unordered_map_mutex_;

// Results to be kept until the goal expires after reaching a terminal state
std::unordered_map<GoalUUID, std::shared_ptr<void>> goal_results_;
Expand Down Expand Up @@ -171,29 +174,41 @@ ServerBase::add_to_wait_set(rcl_wait_set_t * wait_set)
bool
ServerBase::is_ready(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_server_wait_set_get_entities_ready(
wait_set,
pimpl_->action_server_.get(),
&pimpl_->goal_request_ready_,
&pimpl_->cancel_request_ready_,
&pimpl_->result_request_ready_,
&pimpl_->goal_expired_);
bool goal_request_ready;
bool cancel_request_ready;
bool result_request_ready;
bool goal_expired;
rcl_ret_t ret;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_server_wait_set_get_entities_ready(
wait_set,
pimpl_->action_server_.get(),
&goal_request_ready,
&cancel_request_ready,
&result_request_ready,
&goal_expired);
}

pimpl_->goal_request_ready_ = goal_request_ready;
pimpl_->cancel_request_ready_ = cancel_request_ready;
pimpl_->result_request_ready_ = result_request_ready;
pimpl_->goal_expired_ = goal_expired;

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}

return pimpl_->goal_request_ready_ ||
pimpl_->cancel_request_ready_ ||
pimpl_->result_request_ready_ ||
pimpl_->goal_expired_;
return pimpl_->goal_request_ready_.load() ||
pimpl_->cancel_request_ready_.load() ||
pimpl_->result_request_ready_.load() ||
pimpl_->goal_expired_.load();
}

std::shared_ptr<void>
ServerBase::take_data()
{
if (pimpl_->goal_request_ready_) {
if (pimpl_->goal_request_ready_.load()) {
rcl_ret_t ret;
rcl_action_goal_info_t goal_info = rcl_action_get_zero_initialized_goal_info();
rmw_request_id_t request_header;
Expand All @@ -212,7 +227,7 @@ ServerBase::take_data()
ret,
goal_info,
request_header, message));
} else if (pimpl_->cancel_request_ready_) {
} else if (pimpl_->cancel_request_ready_.load()) {
rcl_ret_t ret;
rmw_request_id_t request_header;

Expand All @@ -229,7 +244,7 @@ ServerBase::take_data()
std::make_shared
<std::tuple<rcl_ret_t, std::shared_ptr<action_msgs::srv::CancelGoal::Request>,
rmw_request_id_t>>(ret, request, request_header));
} else if (pimpl_->result_request_ready_) {
} else if (pimpl_->result_request_ready_.load()) {
rcl_ret_t ret;
// Get the result request message
rmw_request_id_t request_header;
Expand All @@ -241,7 +256,7 @@ ServerBase::take_data()
return std::static_pointer_cast<void>(
std::make_shared<std::tuple<rcl_ret_t, std::shared_ptr<void>, rmw_request_id_t>>(
ret, result_request, request_header));
} else if (pimpl_->goal_expired_) {
} else if (pimpl_->goal_expired_.load()) {
return nullptr;
} else {
throw std::runtime_error("Taking data from action server but nothing is ready");
Expand All @@ -251,17 +266,17 @@ ServerBase::take_data()
void
ServerBase::execute(std::shared_ptr<void> & data)
{
if (!data && !pimpl_->goal_expired_) {
if (!data && !pimpl_->goal_expired_.load()) {
throw std::runtime_error("'data' is empty");
}

if (pimpl_->goal_request_ready_) {
if (pimpl_->goal_request_ready_.load()) {
execute_goal_request_received(data);
} else if (pimpl_->cancel_request_ready_) {
} else if (pimpl_->cancel_request_ready_.load()) {
execute_cancel_request_received(data);
} else if (pimpl_->result_request_ready_) {
} else if (pimpl_->result_request_ready_.load()) {
execute_result_request_received(data);
} else if (pimpl_->goal_expired_) {
} else if (pimpl_->goal_expired_.load()) {
execute_check_expired_goals();
} else {
throw std::runtime_error("Executing action server but nothing is ready");
Expand All @@ -286,21 +301,24 @@ ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
rmw_request_id_t request_header = std::get<2>(*shared_ptr);
std::shared_ptr<void> message = std::get<3>(*shared_ptr);


std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);

pimpl_->goal_request_ready_ = false;
bool expected = true;
if (!pimpl_->goal_request_ready_.compare_exchange_strong(expected, false)) {
return;
}

GoalUUID uuid = get_goal_id_from_goal_request(message.get());
convert(uuid, &goal_info);

// Call user's callback, getting the user's response and a ros message to send back
auto response_pair = call_handle_goal_callback(uuid, message);

ret = rcl_action_send_goal_response(
pimpl_->action_server_.get(),
&request_header,
response_pair.second.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_goal_response(
pimpl_->action_server_.get(),
&request_header,
response_pair.second.get());
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
Expand All @@ -325,7 +343,10 @@ ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
}
};
rcl_action_goal_handle_t * rcl_handle;
rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info);
}
if (!rcl_handle) {
throw std::runtime_error("Failed to accept new goal\n");
}
Expand All @@ -334,7 +355,10 @@ ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
// Copy out goal handle since action server storage disappears when it is fini'd
*handle = *rcl_handle;

pimpl_->goal_handles_[uuid] = handle;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_handles_[uuid] = handle;
}

if (GoalResponse::ACCEPT_AND_EXECUTE == status) {
// Change status to executing
Expand All @@ -359,7 +383,6 @@ ServerBase::execute_cancel_request_received(std::shared_ptr<void> & data)
<std::tuple<rcl_ret_t, std::shared_ptr<action_msgs::srv::CancelGoal::Request>,
rmw_request_id_t>>(data);
auto ret = std::get<0>(*shared_ptr);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
Expand All @@ -380,10 +403,14 @@ ServerBase::execute_cancel_request_received(std::shared_ptr<void> & data)
// Get a list of goal info that should be attempted to be cancelled
rcl_action_cancel_response_t cancel_response = rcl_action_get_zero_initialized_cancel_response();

ret = rcl_action_process_cancel_request(
pimpl_->action_server_.get(),
&cancel_request,
&cancel_response);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_process_cancel_request(
pimpl_->action_server_.get(),
&cancel_request,
&cancel_response);
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
Expand Down Expand Up @@ -426,8 +453,12 @@ ServerBase::execute_cancel_request_received(std::shared_ptr<void> & data)
publish_status();
}

ret = rcl_action_send_cancel_response(
pimpl_->action_server_.get(), &request_header, response.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_cancel_response(
pimpl_->action_server_.get(), &request_header, response.get());
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
Expand All @@ -440,7 +471,6 @@ ServerBase::execute_result_request_received(std::shared_ptr<void> & data)
auto shared_ptr = std::static_pointer_cast
<std::tuple<rcl_ret_t, std::shared_ptr<void>, rmw_request_id_t>>(data);
auto ret = std::get<0>(*shared_ptr);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
Expand All @@ -460,7 +490,10 @@ ServerBase::execute_result_request_received(std::shared_ptr<void> & data)
rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info);
bool goal_exists;
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}
if (!goal_exists) {
// Goal does not exists
result_response = create_result_response(action_msgs::msg::GoalStatus::STATUS_UNKNOWN);
Expand All @@ -474,13 +507,15 @@ ServerBase::execute_result_request_received(std::shared_ptr<void> & data)

if (result_response) {
// Send the result now
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t rcl_ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_response.get());
if (RCL_RET_OK != rcl_ret) {
rclcpp::exceptions::throw_from_rcl_error(rcl_ret);
}
} else {
// Store the request so it can be responded to later
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->result_requests_[uuid].push_back(request_header);
}
data.reset();
Expand All @@ -495,16 +530,19 @@ ServerBase::execute_check_expired_goals()

// Loop in case more than 1 goal expired
while (num_expired > 0u) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret;
ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired);
}
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
} else if (num_expired) {
// A goal expired!
GoalUUID uuid;
convert(expired_goals[0], &uuid);
RCLCPP_DEBUG(pimpl_->logger_, "Expired goal %s", to_string(uuid).c_str());
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_.erase(uuid);
pimpl_->result_requests_.erase(uuid);
pimpl_->goal_handles_.erase(uuid);
Expand Down Expand Up @@ -577,20 +615,26 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_m
// Check that the goal exists
rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
bool goal_exists;
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}

if (!goal_exists) {
throw std::runtime_error("Asked to publish result for goal that does not exist");
}

pimpl_->goal_results_[uuid] = result_msg;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_[uuid] = result_msg;
}

// if there are clients who already asked for the result, send it to them
auto iter = pimpl_->result_requests_.find(uuid);
if (iter != pimpl_->result_requests_.end()) {
for (auto & request_header : iter->second) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_msg.get());
if (RCL_RET_OK != ret) {
Expand Down
Loading