Skip to content

Commit 513a844

Browse files
committed
Fix multi-threaded race condition in client.call_async (ros2#871)
* fix multi-thread race condition in client.call (cherry picked from commit 301953d)
1 parent c52df06 commit 513a844

File tree

2 files changed

+36
-24
lines changed

2 files changed

+36
-24
lines changed

rclpy/rclpy/client.py

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ def __init__(
6464
# True when the callback is ready to fire but has not been "taken" by an executor
6565
self._executor_event = False
6666

67+
self._lock = threading.Lock()
68+
6769
def call(self, request: SrvTypeRequest) -> SrvTypeResponse:
6870
"""
6971
Make a service request and wait for the result.
@@ -93,22 +95,6 @@ def unblock(future):
9395
raise future.exception()
9496
return future.result()
9597

96-
def remove_pending_request(self, future: Future) -> None:
97-
"""
98-
Remove a future from the list of pending requests.
99-
100-
This prevents a future from receiving a response and executing its done callbacks.
101-
102-
:param future: A future returned from :meth:`call_async`
103-
"""
104-
for seq, req_future in self._pending_requests.items():
105-
if future == req_future:
106-
try:
107-
del self._pending_requests[seq]
108-
except KeyError:
109-
pass
110-
break
111-
11298
def call_async(self, request: SrvTypeRequest) -> Future:
11399
"""
114100
Make a service request and asyncronously get the result.
@@ -122,18 +108,44 @@ def call_async(self, request: SrvTypeRequest) -> Future:
122108
if not isinstance(request, self.srv_type.Request):
123109
raise TypeError()
124110

125-
with self.handle:
126-
sequence_number = self.__client.send_request(request)
127-
if sequence_number in self._pending_requests:
128-
raise RuntimeError('Sequence (%r) conflicts with pending request' % sequence_number)
111+
with self._lock:
112+
with self.handle:
113+
sequence_number = self.__client.send_request(request)
114+
if sequence_number in self._pending_requests:
115+
raise RuntimeError(f'Sequence ({sequence_number}) conflicts with pending request')
129116

130-
future = Future()
131-
self._pending_requests[sequence_number] = future
117+
future = Future()
118+
self._pending_requests[sequence_number] = future
132119

133-
future.add_done_callback(self.remove_pending_request)
120+
future.add_done_callback(self.remove_pending_request)
134121

135122
return future
136123

124+
def get_pending_request(self, sequence_number: int) -> Future:
125+
"""
126+
Get a future from the list of pending requests.
127+
128+
:param sequence_number: Number identifying the pending request.
129+
:return: The future corresponding to the sequence_number.
130+
:raises: KeyError if the sequence_number is not in the pending requests.
131+
"""
132+
with self._lock:
133+
return self._pending_requests[sequence_number]
134+
135+
def remove_pending_request(self, future: Future) -> None:
136+
"""
137+
Remove a future from the list of pending requests.
138+
139+
This prevents a future from receiving a response and executing its done callbacks.
140+
141+
:param future: A future returned from :meth:`call_async`
142+
"""
143+
with self._lock:
144+
for seq, req_future in self._pending_requests.items():
145+
if future is req_future:
146+
del self._pending_requests[seq]
147+
break
148+
137149
def service_is_ready(self) -> bool:
138150
"""
139151
Check if there is a service server ready.

rclpy/rclpy/executors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ async def _execute_client(self, client, seq_and_response):
351351
if header is not None:
352352
try:
353353
sequence = header.request_id.sequence_number
354-
future = client._pending_requests[sequence]
354+
future = client.get_pending_request(sequence)
355355
except KeyError:
356356
# The request was cancelled
357357
pass

0 commit comments

Comments
 (0)