Skip to content

Commit c52df06

Browse files
committed
added locking to prevent race condition in action client methods (send goal, ...) when the response arrives before the sequence number is written to the internal structures; see ros2#878
1 parent e3bc840 commit c52df06

File tree

1 file changed

+111
-93
lines changed

1 file changed

+111
-93
lines changed

rclpy/rclpy/action/client.py

Lines changed: 111 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ def __init__(
139139
:param feedback_sub_qos_profile: QoS profile for the feedback subscriber.
140140
:param status_sub_qos_profile: QoS profile for the status subscriber.
141141
"""
142+
143+
# lock to avoid race conditions on shared state (_sequence_number_to_goal_id and others)
144+
# without it the goal accepted response can arrive before its sequence number is placed
145+
# into the dictionary, ActionClient will drop the response in that case and the goal
146+
# will never complete!
147+
# see https://github.com/ros2/rclpy/issues/878
148+
self._internal_lock = threading.RLock()
149+
142150
if callback_group is None:
143151
callback_group = node.default_callback_group
144152

@@ -195,21 +203,23 @@ def _remove_pending_request(self, future, pending_requests):
195203
:return: The sequence number associated with the removed future, or
196204
None if the future was not found in the list.
197205
"""
198-
for seq, req_future in list(pending_requests.items()):
199-
if future == req_future:
200-
try:
201-
del pending_requests[seq]
202-
except KeyError:
203-
pass
204-
else:
205-
self.remove_future(future)
206-
return seq
206+
with self._internal_lock:
207+
for seq, req_future in list(pending_requests.items()):
208+
if future == req_future:
209+
try:
210+
del pending_requests[seq]
211+
except KeyError:
212+
pass
213+
else:
214+
self.remove_future(future)
215+
return seq
207216
return None
208217

209218
def _remove_pending_goal_request(self, future):
210-
seq = self._remove_pending_request(future, self._pending_goal_requests)
211-
if seq in self._sequence_number_to_goal_id:
212-
del self._sequence_number_to_goal_id[seq]
219+
with self._internal_lock:
220+
seq = self._remove_pending_request(future, self._pending_goal_requests)
221+
if seq in self._sequence_number_to_goal_id:
222+
del self._sequence_number_to_goal_id[seq]
213223

214224
def _remove_pending_cancel_request(self, future):
215225
self._remove_pending_request(future, self._pending_cancel_requests)
@@ -277,71 +287,76 @@ async def execute(self, taken_data):
277287
"""
278288
if 'goal' in taken_data:
279289
sequence_number, goal_response = taken_data['goal']
280-
if sequence_number in self._sequence_number_to_goal_id:
281-
goal_handle = ClientGoalHandle(
282-
self,
283-
self._sequence_number_to_goal_id[sequence_number],
284-
goal_response)
285-
286-
if goal_handle.accepted:
287-
goal_uuid = bytes(goal_handle.goal_id.uuid)
288-
if goal_uuid in self._goal_handles:
289-
raise RuntimeError(
290-
'Two goals were accepted with the same ID ({})'.format(goal_handle))
291-
self._goal_handles[goal_uuid] = weakref.ref(goal_handle)
292-
293-
self._pending_goal_requests[sequence_number].set_result(goal_handle)
294-
else:
295-
self._node.get_logger().warning(
296-
'Ignoring unexpected goal response. There may be more than '
297-
f"one action server for the action '{self._action_name}'"
298-
)
290+
with self._internal_lock:
291+
if sequence_number in self._sequence_number_to_goal_id:
292+
goal_handle = ClientGoalHandle(
293+
self,
294+
self._sequence_number_to_goal_id[sequence_number],
295+
goal_response)
296+
297+
if goal_handle.accepted:
298+
goal_uuid = bytes(goal_handle.goal_id.uuid)
299+
if goal_uuid in self._goal_handles:
300+
raise RuntimeError(
301+
'Two goals were accepted with the same ID ({})'.format(goal_handle))
302+
self._goal_handles[goal_uuid] = weakref.ref(goal_handle)
303+
304+
self._pending_goal_requests[sequence_number].set_result(goal_handle)
305+
else:
306+
self._node.get_logger().warning(
307+
'Ignoring unexpected goal response. There may be more than '
308+
f"one action server for the action '{self._action_name}'"
309+
)
299310

300311
if 'cancel' in taken_data:
301312
sequence_number, cancel_response = taken_data['cancel']
302-
if sequence_number in self._pending_cancel_requests:
303-
self._pending_cancel_requests[sequence_number].set_result(cancel_response)
304-
else:
305-
self._node.get_logger().warning(
306-
'Ignoring unexpected cancel response. There may be more than '
307-
f"one action server for the action '{self._action_name}'"
308-
)
313+
with self._internal_lock:
314+
if sequence_number in self._pending_cancel_requests:
315+
self._pending_cancel_requests[sequence_number].set_result(cancel_response)
316+
else:
317+
self._node.get_logger().warning(
318+
'Ignoring unexpected cancel response. There may be more than '
319+
f"one action server for the action '{self._action_name}'"
320+
)
309321

310322
if 'result' in taken_data:
311323
sequence_number, result_response = taken_data['result']
312-
if sequence_number in self._pending_result_requests:
313-
self._pending_result_requests[sequence_number].set_result(result_response)
314-
else:
315-
self._node.get_logger().warning(
316-
'Ignoring unexpected result response. There may be more than '
317-
f"one action server for the action '{self._action_name}'"
318-
)
324+
with self._internal_lock:
325+
if sequence_number in self._pending_result_requests:
326+
self._pending_result_requests[sequence_number].set_result(result_response)
327+
else:
328+
self._node.get_logger().warning(
329+
'Ignoring unexpected result response. There may be more than '
330+
f"one action server for the action '{self._action_name}'"
331+
)
319332

320333
if 'feedback' in taken_data:
321334
feedback_msg = taken_data['feedback']
322335
goal_uuid = bytes(feedback_msg.goal_id.uuid)
323336
# Call a registered callback if there is one
324-
if goal_uuid in self._feedback_callbacks:
325-
await await_or_execute(self._feedback_callbacks[goal_uuid], feedback_msg)
337+
with self._internal_lock:
338+
if goal_uuid in self._feedback_callbacks:
339+
await await_or_execute(self._feedback_callbacks[goal_uuid], feedback_msg)
326340

327341
if 'status' in taken_data:
328342
# Update the status of all goal handles maintained by this Action Client
329343
for status_msg in taken_data['status'].status_list:
330344
goal_uuid = bytes(status_msg.goal_info.goal_id.uuid)
331345
status = status_msg.status
332346

333-
if goal_uuid in self._goal_handles:
334-
goal_handle = self._goal_handles[goal_uuid]()
335-
if goal_handle is not None:
336-
goal_handle._status = status
337-
# Remove "done" goals from the list
338-
if (GoalStatus.STATUS_SUCCEEDED == status or
339-
GoalStatus.STATUS_CANCELED == status or
340-
GoalStatus.STATUS_ABORTED == status):
347+
with self._internal_lock:
348+
if goal_uuid in self._goal_handles:
349+
goal_handle = self._goal_handles[goal_uuid]()
350+
if goal_handle is not None:
351+
goal_handle._status = status
352+
# Remove "done" goals from the list
353+
if (GoalStatus.STATUS_SUCCEEDED == status or
354+
GoalStatus.STATUS_CANCELED == status or
355+
GoalStatus.STATUS_ABORTED == status):
356+
del self._goal_handles[goal_uuid]
357+
else:
358+
# Weak reference is None
341359
del self._goal_handles[goal_uuid]
342-
else:
343-
# Weak reference is None
344-
del self._goal_handles[goal_uuid]
345360

346361
def get_num_entities(self):
347362
"""Return number of each type of entity used in the wait set."""
@@ -421,22 +436,23 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
421436
request = self._action_type.Impl.SendGoalService.Request()
422437
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
423438
request.goal = goal
424-
sequence_number = self._client_handle.send_goal_request(request)
425-
if sequence_number in self._pending_goal_requests:
426-
raise RuntimeError(
427-
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
428-
429-
if feedback_callback is not None:
430-
# TODO(jacobperron): Move conversion function to a general-use package
431-
goal_uuid = bytes(request.goal_id.uuid)
432-
self._feedback_callbacks[goal_uuid] = feedback_callback
433-
434-
future = Future()
435-
self._pending_goal_requests[sequence_number] = future
436-
self._sequence_number_to_goal_id[sequence_number] = request.goal_id
437-
future.add_done_callback(self._remove_pending_goal_request)
438-
# Add future so executor is aware
439-
self.add_future(future)
439+
with self._internal_lock:
440+
sequence_number = self._client_handle.send_goal_request(request)
441+
if sequence_number in self._pending_goal_requests:
442+
raise RuntimeError(
443+
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
444+
445+
if feedback_callback is not None:
446+
# TODO(jacobperron): Move conversion function to a general-use package
447+
goal_uuid = bytes(request.goal_id.uuid)
448+
self._feedback_callbacks[goal_uuid] = feedback_callback
449+
450+
future = Future()
451+
self._pending_goal_requests[sequence_number] = future
452+
self._sequence_number_to_goal_id[sequence_number] = request.goal_id
453+
future.add_done_callback(self._remove_pending_goal_request)
454+
# Add future so executor is aware
455+
self.add_future(future)
440456

441457
return future
442458

@@ -479,16 +495,17 @@ def _cancel_goal_async(self, goal_handle):
479495

480496
cancel_request = CancelGoal.Request()
481497
cancel_request.goal_info.goal_id = goal_handle.goal_id
482-
sequence_number = self._client_handle.send_cancel_request(cancel_request)
483-
if sequence_number in self._pending_cancel_requests:
484-
raise RuntimeError(
485-
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
498+
with self._internal_lock:
499+
sequence_number = self._client_handle.send_cancel_request(cancel_request)
500+
if sequence_number in self._pending_cancel_requests:
501+
raise RuntimeError(
502+
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
486503

487-
future = Future()
488-
self._pending_cancel_requests[sequence_number] = future
489-
future.add_done_callback(self._remove_pending_cancel_request)
490-
# Add future so executor is aware
491-
self.add_future(future)
504+
future = Future()
505+
self._pending_cancel_requests[sequence_number] = future
506+
future.add_done_callback(self._remove_pending_cancel_request)
507+
# Add future so executor is aware
508+
self.add_future(future)
492509

493510
return future
494511

@@ -531,16 +548,17 @@ def _get_result_async(self, goal_handle):
531548

532549
result_request = self._action_type.Impl.GetResultService.Request()
533550
result_request.goal_id = goal_handle.goal_id
534-
sequence_number = self._client_handle.send_result_request(result_request)
535-
if sequence_number in self._pending_result_requests:
536-
raise RuntimeError(
537-
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
538-
539-
future = Future()
540-
self._pending_result_requests[sequence_number] = future
541-
future.add_done_callback(self._remove_pending_result_request)
542-
# Add future so executor is aware
543-
self.add_future(future)
551+
with self._internal_lock:
552+
sequence_number = self._client_handle.send_result_request(result_request)
553+
if sequence_number in self._pending_result_requests:
554+
raise RuntimeError(
555+
'Sequence ({}) conflicts with pending result request'.format(sequence_number))
556+
557+
future = Future()
558+
self._pending_result_requests[sequence_number] = future
559+
future.add_done_callback(self._remove_pending_result_request)
560+
# Add future so executor is aware
561+
self.add_future(future)
544562

545563
return future
546564

0 commit comments

Comments
 (0)