Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 51 additions & 13 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ def wait(self, timeout_sec=None):
return True


class TimeoutException(Exception):
"""Signal that a timeout occurred."""

pass


class Executor:
"""
A base class for an executor.
Expand All @@ -94,6 +100,10 @@ def __init__(self):
# True if shutdown has been called
self._is_shutdown = False
self._work_tracker = _WorkTracker()
# State for wait_for_ready_callbacks to reuse generator
self._cb_iter = None
self._last_args = None
self._last_kwargs = None

def shutdown(self, timeout_sec=None):
"""
Expand All @@ -114,6 +124,9 @@ def shutdown(self, timeout_sec=None):
_rclpy.rclpy_destroy_entity(self._guard_condition)

self._guard_condition = None
self._cb_iter = None
self._last_args = None
self._last_kwargs = None
return True

def __del__(self):
Expand Down Expand Up @@ -257,10 +270,12 @@ def can_execute(self, entity):
"""
return not entity._executor_event and entity.callback_group.can_execute(entity)

def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
def _wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
"""
Yield callbacks that are ready to be performed.

Raises :class:`TimeoutException` on timeout.

:param timeout_sec: Seconds to wait. Block forever if None or negative. Don't wait if 0
:type timeout_sec: float or None
:param nodes: A list of nodes to wait on. Wait on all nodes if None.
Expand Down Expand Up @@ -390,23 +405,45 @@ def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
yield handler, srv, node

# Check timeout timer
if (timeout_nsec == 0 or
(timeout_timer is not None and timeout_timer.timer_pointer in timers_ready)):
break
if (
timeout_nsec == 0 or
(timeout_timer is not None and timeout_timer.timer_pointer in timers_ready)
):
raise TimeoutException()

def wait_for_ready_callbacks(self, *args, **kwargs):
"""
Reuse generator and return callbacks that are ready to be performed.

See :func:`Executor._wait_for_ready_callbacks` for documentation
"""
# if an old generator is done, this variable makes the loop get a new one before returning
got_generator = False
while not got_generator:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the loop a lot better; thanks!

if self._cb_iter is None or self._last_args != args or self._last_kwargs != kwargs:
# Create a new generator
self._last_args = args
self._last_kwargs = kwargs
self._cb_iter = self._wait_for_ready_callbacks(*args, **kwargs)
got_generator = True

try:
return next(self._cb_iter)
except StopIteration:
# Generator ran out of work
self._cb_iter = None


class SingleThreadedExecutor(Executor):
"""Runs callbacks in the thread which calls :func:`SingleThreadedExecutor.spin`."""

def __init__(self):
super().__init__()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that a method that is not overloaded in the child class means that the parent version of the method is used. I believe this applies to all methods included the __init_ one

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure about this, so I just tried it out with the following program:

class Base:
	def __init__(self):
		print("Base init")

class Sub:
	def do_something(self):
		print("Sub do_something")

x = Sub()
x.do_something()

If you run that under python3, the output you get is just "Sub do_something"; you never see the "Base init" printed out.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you try the same thing with Sub inheriting from base ?

class Base:
	def __init__(self):
		print("Base init")

class Sub(Base):
	def do_something(self):
		print("Sub do_something")

x = Sub()
x.do_something()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D'oh, sorry. I forgot that bit. Never mind, you are totally correct.


def spin_once(self, timeout_sec=None):
try:
handler, entity, node = next(self.wait_for_ready_callbacks(timeout_sec=timeout_sec))
handler()
except StopIteration:
handler, entity, node = self.wait_for_ready_callbacks(timeout_sec=timeout_sec)
except TimeoutException:
pass
else:
handler()


class MultiThreadedExecutor(Executor):
Expand All @@ -431,7 +468,8 @@ def __init__(self, num_threads=None):

def spin_once(self, timeout_sec=None):
try:
handler, entity, node = next(self.wait_for_ready_callbacks(timeout_sec=timeout_sec))
self._executor.submit(handler)
except StopIteration:
handler, entity, node = self.wait_for_ready_callbacks(timeout_sec=timeout_sec)
except TimeoutException:
pass
else:
self._executor.submit(handler)