Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 53 additions & 24 deletions rclcpp_action/src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ServerBaseImpl
{
}

// Lock everything except user callbacks
// Lock for action_server_
std::recursive_mutex action_server_reentrant_mutex_;

std::shared_ptr<rcl_action_server_t> action_server_;
Expand All @@ -63,6 +63,9 @@ class ServerBaseImpl
bool result_request_ready_ = false;
bool goal_expired_ = false;

// Lock for unordered_maps
std::recursive_mutex unordered_map_mutex_;

// Results to be kept until the goal expires after reaching a terminal state
std::unordered_map<GoalUUID, std::shared_ptr<void>> goal_results_;
// Requests for results are kept until a result becomes available
Expand Down Expand Up @@ -286,9 +289,6 @@ ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
rmw_request_id_t request_header = std::get<2>(*shared_ptr);
std::shared_ptr<void> message = std::get<3>(*shared_ptr);


std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);

pimpl_->goal_request_ready_ = false;

GoalUUID uuid = get_goal_id_from_goal_request(message.get());
Expand All @@ -297,10 +297,13 @@ ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
// Call user's callback, getting the user's response and a ros message to send back
auto response_pair = call_handle_goal_callback(uuid, message);

ret = rcl_action_send_goal_response(
pimpl_->action_server_.get(),
&request_header,
response_pair.second.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_goal_response(
pimpl_->action_server_.get(),
&request_header,
response_pair.second.get());
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
Expand All @@ -325,7 +328,10 @@ ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
}
};
rcl_action_goal_handle_t * rcl_handle;
rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_handle = rcl_action_accept_new_goal(pimpl_->action_server_.get(), &goal_info);
}
if (!rcl_handle) {
throw std::runtime_error("Failed to accept new goal\n");
}
Expand All @@ -334,7 +340,10 @@ ServerBase::execute_goal_request_received(std::shared_ptr<void> & data)
// Copy out goal handle since action server storage disappears when it is fini'd
*handle = *rcl_handle;

pimpl_->goal_handles_[uuid] = handle;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_handles_[uuid] = handle;
}
Comment on lines +340 to +361

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this particular lock here. At least, this lock is meant to protect the action_server, and that is not being accessed here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pimpl_->goal_handles_.erase(uuid);

goal_handles_ can be changed simultaneously, so I put a lock guard here too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry to be a bit pedantic here, but the name of the lock is currently action_server_reentrant_mutex. As a reader of the code, then, I expect that that lock will protect the action_server, and not necessarily protect anything else.

But I do agree that the goal_handles could be changed by two separate threads. There are 2 ways we can go with this:

  1. Rename the lock to pimpl_lock or something like that. At that point, it makes more sense that the lock protects the entire PIMPL structure. But we would also need to audit the rest of the code to make sure that the lock is being held anytime anything in the structure is being modified.
  2. Introduce another lock specifically to protect the goal_handles, and lock that as appropriate.

I'm honestly not sure which is better.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this message and have made a change, but I added another mutex for unordered maps including goal_handles, because the pimpl structure has three groups of variables, 1) action_server and its attributes (size_t, bool) captured from actsion_server, 2) three unordered maps, and 3) references (clock and logger).
I think we don't need to lock the entire pimpl structure.

One line I'm not sure about is updating goal_request_ready_ without a lock.
All size_t and bool variables are protected because they are updated with action_server.

pimpl_->goal_request_ready_ = false;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One line I'm not sure about is updating goal_request_ready_ without a lock.

Good question. It should be atomic since it is just updating a boolean, but that is not guaranteed. One thing we could do there is make it a std::atomic<bool>, which would guarantee it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I changed it to std::atomic<bool>.


if (GoalResponse::ACCEPT_AND_EXECUTE == status) {
// Change status to executing
Expand All @@ -359,7 +368,6 @@ ServerBase::execute_cancel_request_received(std::shared_ptr<void> & data)
<std::tuple<rcl_ret_t, std::shared_ptr<action_msgs::srv::CancelGoal::Request>,
rmw_request_id_t>>(data);
auto ret = std::get<0>(*shared_ptr);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
Expand All @@ -380,10 +388,14 @@ ServerBase::execute_cancel_request_received(std::shared_ptr<void> & data)
// Get a list of goal info that should be attempted to be cancelled
rcl_action_cancel_response_t cancel_response = rcl_action_get_zero_initialized_cancel_response();

ret = rcl_action_process_cancel_request(
pimpl_->action_server_.get(),
&cancel_request,
&cancel_response);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_process_cancel_request(
pimpl_->action_server_.get(),
&cancel_request,
&cancel_response);
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
Expand Down Expand Up @@ -426,8 +438,12 @@ ServerBase::execute_cancel_request_received(std::shared_ptr<void> & data)
publish_status();
}

ret = rcl_action_send_cancel_response(
pimpl_->action_server_.get(), &request_header, response.get());
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_send_cancel_response(
pimpl_->action_server_.get(), &request_header, response.get());
}

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
Expand All @@ -440,7 +456,6 @@ ServerBase::execute_result_request_received(std::shared_ptr<void> & data)
auto shared_ptr = std::static_pointer_cast
<std::tuple<rcl_ret_t, std::shared_ptr<void>, rmw_request_id_t>>(data);
auto ret = std::get<0>(*shared_ptr);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
if (RCL_RET_ACTION_SERVER_TAKE_FAILED == ret) {
// Ignore take failure because connext fails if it receives a sample without valid data.
// This happens when a client shuts down and connext receives a sample saying the client is
Expand All @@ -460,7 +475,10 @@ ServerBase::execute_result_request_received(std::shared_ptr<void> & data)
rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info);
bool goal_exists;
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}
if (!goal_exists) {
// Goal does not exists
result_response = create_result_response(action_msgs::msg::GoalStatus::STATUS_UNKNOWN);
Expand All @@ -474,13 +492,15 @@ ServerBase::execute_result_request_received(std::shared_ptr<void> & data)

if (result_response) {
// Send the result now
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t rcl_ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_response.get());
if (RCL_RET_OK != rcl_ret) {
rclcpp::exceptions::throw_from_rcl_error(rcl_ret);
}
} else {
// Store the request so it can be responded to later
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->result_requests_[uuid].push_back(request_header);
}
data.reset();
Expand All @@ -495,16 +515,19 @@ ServerBase::execute_check_expired_goals()

// Loop in case more than 1 goal expired
while (num_expired > 0u) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret;
ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
ret = rcl_action_expire_goals(pimpl_->action_server_.get(), expired_goals, 1, &num_expired);
}
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
} else if (num_expired) {
// A goal expired!
GoalUUID uuid;
convert(expired_goals[0], &uuid);
RCLCPP_DEBUG(pimpl_->logger_, "Expired goal %s", to_string(uuid).c_str());
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_.erase(uuid);
pimpl_->result_requests_.erase(uuid);
pimpl_->goal_handles_.erase(uuid);
Expand Down Expand Up @@ -577,20 +600,26 @@ ServerBase::publish_result(const GoalUUID & uuid, std::shared_ptr<void> result_m
// Check that the goal exists
rcl_action_goal_info_t goal_info;
convert(uuid, &goal_info);
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
bool goal_exists;
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
goal_exists = rcl_action_server_goal_exists(pimpl_->action_server_.get(), &goal_info);
}

if (!goal_exists) {
throw std::runtime_error("Asked to publish result for goal that does not exist");
}

pimpl_->goal_results_[uuid] = result_msg;
{
std::lock_guard<std::recursive_mutex> lock(pimpl_->unordered_map_mutex_);
pimpl_->goal_results_[uuid] = result_msg;
}

// if there are clients who already asked for the result, send it to them
auto iter = pimpl_->result_requests_.find(uuid);
if (iter != pimpl_->result_requests_.end()) {
for (auto & request_header : iter->second) {
std::lock_guard<std::recursive_mutex> lock(pimpl_->action_server_reentrant_mutex_);
rcl_ret_t ret = rcl_action_send_result_response(
pimpl_->action_server_.get(), &request_header, result_msg.get());
if (RCL_RET_OK != ret) {
Expand Down
89 changes: 89 additions & 0 deletions rclcpp_action/test/test_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1221,3 +1221,92 @@ TEST_F(TestCancelRequestServer, publish_status_send_cancel_response_errors)

EXPECT_THROW(SendClientCancelRequest(), std::runtime_error);
}

class TestDeadlockServer : public TestServer
{
public:
void SetUp()
{
node_ = std::make_shared<rclcpp::Node>("goal_request", "/rclcpp_action/goal_request");
uuid1_ = {{1, 2, 3, 4, 5, 6, 70, 80, 9, 1, 11, 120, 13, 140, 15, 160}};
uuid2_ = {{2, 2, 3, 4, 5, 6, 70, 80, 9, 1, 11, 120, 13, 140, 15, 160}};
action_server_ = rclcpp_action::create_server<Fibonacci>(
node_, "fibonacci",
[this](const GoalUUID &, std::shared_ptr<const Fibonacci::Goal>) {
// instead of making a deadlock, check if it can acquire the lock in a second
std::unique_lock<std::recursive_timed_mutex> lock(server_mutex_, std::defer_lock);
this->TryLockFor(lock, std::chrono::milliseconds(1000));
return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
},
[this](std::shared_ptr<GoalHandle>) {
// instead of making a deadlock, check if it can acquire the lock in a second
std::unique_lock<std::recursive_timed_mutex> lock(server_mutex_, std::defer_lock);
this->TryLockFor(lock, std::chrono::milliseconds(1000));
return rclcpp_action::CancelResponse::ACCEPT;
},
[this](std::shared_ptr<GoalHandle> handle) {
// instead of making a deadlock, check if it can acquire the lock in a second
std::unique_lock<std::recursive_timed_mutex> lock(server_mutex_, std::defer_lock);
this->TryLockFor(lock, std::chrono::milliseconds(1000));
goal_handle_ = handle;
});
}

void GoalSucceeded()
{
std::lock_guard<std::recursive_timed_mutex> lock(server_mutex_);
rclcpp::sleep_for(std::chrono::milliseconds(100));
auto result = std::make_shared<Fibonacci::Result>();
result->sequence = {5, 8, 13, 21};
goal_handle_->succeed(result);
}

void GoalCanceled()
{
std::lock_guard<std::recursive_timed_mutex> lock(server_mutex_);
rclcpp::sleep_for(std::chrono::milliseconds(100));
auto result = std::make_shared<Fibonacci::Result>();
goal_handle_->canceled(result);
}

void TryLockFor(
std::unique_lock<std::recursive_timed_mutex> & lock,
std::chrono::milliseconds timeout
)
{
ASSERT_TRUE(lock.try_lock_for(timeout));
}

protected:
std::recursive_timed_mutex server_mutex_;
GoalUUID uuid1_, uuid2_;
std::shared_ptr<rclcpp::Node> node_;
std::shared_ptr<rclcpp_action::Server<Fibonacci>> action_server_;

using GoalHandle = rclcpp_action::ServerGoalHandle<Fibonacci>;
std::shared_ptr<GoalHandle> goal_handle_;
};

TEST_F(TestDeadlockServer, deadlock_while_succeed)
{
send_goal_request(node_, uuid1_);
// this will lock wrapper's mutex and intentionally wait 100ms for calling succeed
// to try to acquire the lock of rclcpp_action mutex
std::thread t(&TestDeadlockServer::GoalSucceeded, this);
// after the wrapper's mutex is locked and before succeed is called
rclcpp::sleep_for(std::chrono::milliseconds(50));
// call next goal request to intentionally reproduce deadlock
// this first locks rclcpp_action mutex and then call callback to lock wrapper's mutex
send_goal_request(node_, uuid2_);
t.join();
}

TEST_F(TestDeadlockServer, deadlock_while_canceled)
{
send_goal_request(node_, uuid1_);
send_cancel_request(node_, uuid1_);
std::thread t(&TestDeadlockServer::GoalCanceled, this);
rclcpp::sleep_for(std::chrono::milliseconds(50));
send_goal_request(node_, uuid2_); // deadlock here
t.join();
}