Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 69 additions & 27 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor
import inspect
import multiprocessing
from threading import Condition
from threading import Lock

from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.task import Task
from rclpy.timer import WallTimer
from rclpy.utilities import ok
from rclpy.utilities import timeout_sec_to_nsec
Expand Down Expand Up @@ -71,6 +73,16 @@ def wait(self, timeout_sec=None):
return True


async def await_or_execute(callback, *args):
"""Await a callback if it is a coroutine, else execute it."""
if inspect.iscoroutinefunction(callback):
# Await a coroutine
return await callback(*args)
else:
# Call a normal function
return callback(*args)


class TimeoutException(Exception):
"""Signal that a timeout occurred."""

Expand All @@ -93,6 +105,9 @@ def __init__(self):
super().__init__()
self._nodes = set()
self._nodes_lock = Lock()
# Tasks to be executed (oldest first) 3-tuple Task, Entity, Node
self._tasks = []
self._tasks_lock = Lock()
# This is triggered when wait_for_ready_callbacks should rebuild the wait list
gc, gc_handle = _rclpy.rclpy_create_guard_condition()
self._guard_condition = gc
Expand All @@ -105,6 +120,22 @@ def __init__(self):
self._last_args = None
self._last_kwargs = None

def create_task(self, callback, *args, **kwargs):
"""
Add a callback or coroutine to be executed during :meth:`spin` and return a Future.

Arguments to this function are passed to the callback.

:param callback: A callback to be run in the executor
:type callback: callable or coroutine function
:rtype: :class:`rclpy.task.Future` instance
"""
task = Task(callback, args, kwargs, executor=self)
with self._tasks_lock:
self._tasks.append((task, None, None))
# Task inherits from Future
return task

def shutdown(self, timeout_sec=None):
"""
Stop executing callbacks and wait for their completion.
Expand Down Expand Up @@ -175,23 +206,23 @@ def spin_once(self, timeout_sec=None):
def _take_timer(self, tmr):
_rclpy.rclpy_call_timer(tmr.timer_handle)

def _execute_timer(self, tmr, _):
tmr.callback()
async def _execute_timer(self, tmr, _):
await await_or_execute(tmr.callback)

def _take_subscription(self, sub):
msg = _rclpy.rclpy_take(sub.subscription_handle, sub.msg_type)
return msg

def _execute_subscription(self, sub, msg):
async def _execute_subscription(self, sub, msg):
if msg:
sub.callback(msg)
await await_or_execute(sub.callback, msg)

def _take_client(self, client):
response = _rclpy.rclpy_take_response(
client.client_handle, client.srv_type.Response, client.sequence_number)
return response

def _execute_client(self, client, response):
async def _execute_client(self, client, response):
if response:
# clients spawn their own thread to wait for a response in the
# wait_for_future function. Users can either use this mechanism or monitor
Expand All @@ -203,42 +234,35 @@ def _take_service(self, srv):
srv.service_handle, srv.srv_type.Request)
return request_and_header

def _execute_service(self, srv, request_and_header):
async def _execute_service(self, srv, request_and_header):
if request_and_header is None:
return
(request, header) = request_and_header
if request:
response = srv.callback(request, srv.srv_type.Response())
response = await await_or_execute(srv.callback, request, srv.srv_type.Response())
srv.send_response(response, header)

def _take_guard_condition(self, gc):
gc._executor_triggered = False

def _execute_guard_condition(self, gc, _):
gc.callback()
async def _execute_guard_condition(self, gc, _):
await await_or_execute(gc.callback)

def _make_handler(self, entity, take_from_wait_list, call_callback):
def _make_handler(self, entity, node, take_from_wait_list, call_coroutine):
"""
Make a handler that performs work on an entity.

:param entity: An entity to wait on
:param take_from_wait_list: Makes the entity to stop appearing in the wait list
:type take_from_wait_list: callable
:param call_callback: Does the work the entity is ready for
:type call_callback: callable
:param call_coroutine: Does the work the entity is ready for
:type call_coroutine: coroutine function
:rtype: callable
"""
gc = self._guard_condition
work_tracker = self._work_tracker
is_shutdown = self._is_shutdown
# Mark this so it doesn't get added back to the wait list
entity._executor_event = True

def handler():
nonlocal entity
nonlocal gc
nonlocal is_shutdown
nonlocal work_tracker
async def handler(entity, gc, is_shutdown, work_tracker):
if is_shutdown or not entity.callback_group.beginning_execution(entity):
# Didn't get the callback, or the executor has been ordered to stop
entity._executor_event = False
Expand All @@ -252,13 +276,18 @@ def handler():
_rclpy.rclpy_trigger_guard_condition(gc)

try:
call_callback(entity, arg)
await call_coroutine(entity, arg)
finally:
entity.callback_group.ending_execution(entity)
# Signal that work has been done so the next callback in a mutually exclusive
# callback group can get executed
_rclpy.rclpy_trigger_guard_condition(gc)
return handler
task = Task(
handler, (entity, self._guard_condition, self._is_shutdown, self._work_tracker),
executor=self)
with self._tasks_lock:
self._tasks.append((task, entity, node))
return task

def can_execute(self, entity):
"""
Expand Down Expand Up @@ -290,6 +319,18 @@ def _wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
if nodes is None:
nodes = self.get_nodes()

# Yield tasks in-progress before waiting for new work
tasks = None
with self._tasks_lock:
tasks = list(self._tasks)
if tasks:
for task, entity, node in reversed(tasks):
if not task.executing() and not task.done() and (node is None or node in nodes):
yield task, entity, node
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))

yielded_work = False
while not yielded_work and not self._is_shutdown:
# Gather entities that can be waited on
Expand Down Expand Up @@ -368,39 +409,40 @@ def _wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr.callback_group.can_execute(tmr):
handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
tmr, node, self._take_timer, self._execute_timer)
yielded_work = True
yield handler, tmr, node

for sub in node.subscriptions:
if sub.subscription_pointer in subs_ready:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
sub, node, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

for gc in node.guards:
if gc._executor_triggered:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
gc, node, self._take_guard_condition,
self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

for client in node.clients:
if client.client_pointer in clients_ready:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
client, node, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

for srv in node.services:
if srv.service_pointer in services_ready:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
srv, self._take_service, self._execute_service)
srv, node, self._take_service, self._execute_service)
yielded_work = True
yield handler, srv, node

Expand Down
Loading