Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 53 additions & 13 deletions rclpy/rclpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from rclpy.utilities import get_rmw_implementation_identifier # noqa
from rclpy.utilities import ok
from rclpy.utilities import shutdown # noqa
from rclpy.utilities import shutdown as _shutdown
from rclpy.utilities import try_shutdown # noqa


Expand All @@ -27,33 +27,75 @@ def init(*, args=None):
args if args is not None else sys.argv)


# The global spin functions need an executor to do the work
# A persistent executor can re-run async tasks that yielded in rclpy.spin*().
__executor = None


def get_global_executor():
global __executor
if __executor is None:
# imported locally to avoid loading extensions on module import
from rclpy.executors import SingleThreadedExecutor
__executor = SingleThreadedExecutor()
return __executor


def shutdown():
global __executor
if __executor is not None:
__executor.shutdown()
__executor = None
_shutdown()


def create_node(node_name, *, namespace=None):
# imported locally to avoid loading extensions on module import
from rclpy.node import Node
return Node(node_name, namespace=namespace)


def spin_once(node, *, timeout_sec=None):
# imported locally to avoid loading extensions on module import
from rclpy.executors import SingleThreadedExecutor
executor = SingleThreadedExecutor()
"""
Execute one item of work or wait until timeout expires.

One callback will be executed in a SingleThreadedExecutor as long as that
callback is ready before the timeout expires.

It is possible the work done may be for a node other than the one passed to this method
if the global executor has a partially completed coroutine.

:param node: A node to add to the executor to check for work.
:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:return: Always returns None regardless whether work executes or timeout expires.
:rtype: None
"""
executor = get_global_executor()
try:
executor.add_node(node)
executor.spin_once(timeout_sec=timeout_sec)
finally:
executor.shutdown()
executor.remove_node(node)


def spin(node):
# imported locally to avoid loading extensions on module import
from rclpy.executors import SingleThreadedExecutor
executor = SingleThreadedExecutor()
"""
Execute work blocking until the library is shutdown.

Callbacks will be executed in a SingleThreadedExecutor until shutdown() is called.
This method blocks.

:param node: A node to add to the executor to check for work.
:return: Always returns None regardless whether work executes or timeout expires.
:rtype: None
"""
executor = get_global_executor()
try:
executor.add_node(node)
while ok():
executor.spin_once()
finally:
executor.shutdown()
executor.remove_node(node)


def spin_until_future_complete(node, future):
Expand All @@ -66,11 +108,9 @@ def spin_until_future_complete(node, future):
:param future: The future object to wait on.
:type future: rclpy.task.Future
"""
# imported locally to avoid loading extensions on module import
from rclpy.executors import SingleThreadedExecutor
executor = SingleThreadedExecutor()
executor = get_global_executor()
try:
executor.add_node(node)
executor.spin_until_future_complete(future)
finally:
executor.shutdown()
executor.remove_node(node)
11 changes: 11 additions & 0 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,17 @@ def add_node(self, node):
# Rebuild the wait set so it includes this new node
_rclpy.rclpy_trigger_guard_condition(self._guard_condition)

def remove_node(self, node):
"""Stop managing this node's callbacks."""
with self._nodes_lock:
try:
self._nodes.add(node)
except KeyError:
pass
else:
# Rebuild the wait set so it doesn't include this node
_rclpy.rclpy_trigger_guard_condition(self._guard_condition)

def get_nodes(self):
"""
Return nodes which have been added to this executor.
Expand Down
43 changes: 37 additions & 6 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@

class TestExecutor(unittest.TestCase):

@classmethod
def setUpClass(cls):
def setUp(self):
rclpy.init()
cls.node = rclpy.create_node('TestExecutor', namespace='/rclpy')
self.node = rclpy.create_node('TestExecutor', namespace='/rclpy')

@classmethod
def tearDownClass(cls):
cls.node.destroy_node()
def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()

def func_execution(self, executor):
Expand Down Expand Up @@ -199,6 +197,39 @@ async def coro2():
self.assertTrue(future2.done())
self.assertEqual('Sentinel Result 2', future2.result())

def test_global_executor_completes_async_task(self):
self.assertIsNotNone(self.node.handle)

class TriggerAwait:

def __init__(self):
self.do_yield = True

def __await__(self):
while self.do_yield:
yield
return

trigger = TriggerAwait()
did_callback = False
did_return = False

async def timer_callback():
nonlocal trigger, did_callback, did_return
did_callback = True
await trigger
did_return = True

timer = self.node.create_timer(0.1, timer_callback)

rclpy.spin_once(self.node, timeout_sec=0.5)
self.assertTrue(did_callback)

timer.cancel()
trigger.do_yield = False
rclpy.spin_once(self.node, timeout_sec=0)
self.assertTrue(did_return)


if __name__ == '__main__':
unittest.main()