Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 55 additions & 49 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def get_nodes(self):
:rtype: list
"""
with self._nodes_lock:
return [node for node in self._nodes]
return list(self._nodes)

def spin(self):
"""Execute callbacks until shutdown."""
Expand Down Expand Up @@ -247,15 +247,15 @@ def handler():
_rclpy.rclpy_trigger_guard_condition(gc)
return handler

def _filter_eligible_entities(self, entities):
def can_execute(self, entity):
"""
Filter entities that should not be put onto the wait list.
Determine if a callback for an entity can be executed.

:param entity_list: Entities to be checked for eligibility
:type entity_list: list
:rtype: list
:param entity: Subscription, Timer, Guard condition, etc
:returns: True if the entity callback can be executed
:rtype: bool
"""
return [e for e in entities if e.callback_group.can_execute(e) and not e._executor_event]
return not entity._executor_event and entity.callback_group.can_execute(entity)

def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
"""
Expand Down Expand Up @@ -284,16 +284,16 @@ def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
clients = []
services = []
for node in nodes:
subscriptions.extend(self._filter_eligible_entities(node.subscriptions))
timers.extend(self._filter_eligible_entities(node.timers))
clients.extend(self._filter_eligible_entities(node.clients))
services.extend(self._filter_eligible_entities(node.services))
node_guards = self._filter_eligible_entities(node.guards)
subscriptions.extend(filter(self.can_execute, node.subscriptions))
timers.extend(filter(self.can_execute, node.timers))
clients.extend(filter(self.can_execute, node.clients))
services.extend(filter(self.can_execute, node.services))
node_guards = filter(self.can_execute, node.guards)
# retrigger a guard condition that was triggered but not handled
for gc in node_guards:
if gc._executor_triggered:
gc.trigger()
guards.extend(node_guards)
guards.append(gc)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be surprised if this is faster, and it appears to be exactly the same behavior. Do you have some reference which supports why this might be considered faster?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a consequence of line 291 where node_guards becomes a generator instead of a list.

An alternative that uses extend instead of append is:

                node_guards = list(filter(self.can_execute, node.guards))
                # retrigger a guard condition that was triggered but not handled
                for gc in node_guards:
                    if gc._executor_triggered:
                        gc.trigger()
                guards.extend(node_guards)

But that increases the executor overhead from 22.2% to 22.6% on my machine. I'm guessing the overhead of calling append for each item in the list is less than the overhead of creating an intermediate list and iterating internal to extend.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see so the removed line was implicitly guards.extend(list(node_guards)). 👍

(sigint_gc, sigint_gc_handle) = _rclpy.rclpy_get_sigint_guard_condition()
if timeout_timer is not None:
timers.append(timeout_timer)
Expand Down Expand Up @@ -341,47 +341,53 @@ def wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
_rclpy.rclpy_destroy_entity(sigint_gc)

# Mark all guards as triggered before yielding any handlers since they're auto-taken
for gc in [g for g in guards if g.guard_pointer in guards_ready]:
gc._executor_triggered = True
for gc in guards:
if gc.guard_pointer in guards_ready:
gc._executor_triggered = True

# Process ready entities one node at a time
for node in nodes:
for tmr in [t for t in node.timers if t.timer_pointer in timers_ready]:
# Check that a timer is ready to workaround rcl issue with cancelled timers
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr.callback_group.can_execute(tmr):
for tmr in node.timers:
if tmr.timer_pointer in timers_ready:
# Check that a timer is ready to workaround rcl issue with cancelled timers
if _rclpy.rclpy_is_timer_ready(tmr.timer_handle):
if tmr.callback_group.can_execute(tmr):
handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
yielded_work = True
yield handler, tmr, node

for sub in node.subscriptions:
if sub.subscription_pointer in subs_ready:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

for gc in node.guards:
if gc._executor_triggered:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

for client in node.clients:
if client.client_pointer in clients_ready:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

for srv in node.services:
if srv.service_pointer in services_ready:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
tmr, self._take_timer, self._execute_timer)
srv, self._take_service, self._execute_service)
yielded_work = True
yield handler, tmr, node

for sub in [s for s in node.subscriptions if s.subscription_pointer in subs_ready]:
if sub.callback_group.can_execute(sub):
handler = self._make_handler(
sub, self._take_subscription, self._execute_subscription)
yielded_work = True
yield handler, sub, node

for gc in [g for g in node.guards if g._executor_triggered]:
if gc.callback_group.can_execute(gc):
handler = self._make_handler(
gc, self._take_guard_condition, self._execute_guard_condition)
yielded_work = True
yield handler, gc, node

for client in [c for c in node.clients if c.client_pointer in clients_ready]:
if client.callback_group.can_execute(client):
handler = self._make_handler(
client, self._take_client, self._execute_client)
yielded_work = True
yield handler, client, node

for srv in [s for s in node.services if s.service_pointer in services_ready]:
if srv.callback_group.can_execute(srv):
handler = self._make_handler(
srv, self._take_service, self._execute_service)
yielded_work = True
yield handler, srv, node
yield handler, srv, node

# Check timeout timer
if (timeout_nsec == 0 or
Expand Down