Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import deque
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack
from dataclasses import dataclass
from functools import partial
import inspect
import os
Expand All @@ -22,6 +24,7 @@
from threading import RLock
import time
from types import TracebackType
from typing import Deque
from typing import Any
from typing import Callable
from typing import ContextManager
Expand Down Expand Up @@ -176,6 +179,12 @@ def timeout(self, timeout: float) -> None:
self._timeout = timeout


@dataclass
class TaskData:
source_node: 'Optional[Node]' = None
source_entity: 'Optional[Entity]' = None


class Executor(ContextManager['Executor']):
"""
The base class for an executor.
Expand Down Expand Up @@ -205,8 +214,10 @@ def __init__(self, *, context: Optional[Context] = None) -> None:
self._context = get_default_context() if context is None else context
self._nodes: Set[Node] = set()
self._nodes_lock = RLock()
# Tasks to be executed (oldest first) 3-tuple Task, Entity, Node
self._tasks: List[Tuple[Task[Any], 'Optional[Entity]', Optional[Node]]] = []
# all tasks that are not complete or canceled
self._pending_tasks: Dict[Task, TaskData] = {}
# tasks that are ready to execute
self._ready_tasks: Deque[Task[Any]] = deque()
self._tasks_lock = Lock()
# This is triggered when wait_for_ready_callbacks should rebuild the wait list
self._guard: Optional[GuardCondition] = GuardCondition(
Expand Down Expand Up @@ -253,6 +264,8 @@ def create_task(self, callback: Callable[..., Any], *args: Any, **kwargs: Any
:param callback: A callback to be run in the executor.
"""
task = Task(callback, args, kwargs, executor=self)
with self._tasks_lock:
self._pending_tasks[task] = TaskData()
self._call_task_in_next_spin(task)
return task

Expand All @@ -263,7 +276,7 @@ def _call_task_in_next_spin(self, task: Task) -> None:
:param task: A task to be run in the executor.
"""
with self._tasks_lock:
self._tasks.append((task, None, None))
self._ready_tasks.append(task)
if self._guard:
self._guard.trigger()

Expand Down Expand Up @@ -631,6 +644,11 @@ async def handler(entity: 'EntityT', gc: GuardCondition, is_shutdown: bool,
task: Task[None] = Task(
handler, (entity, self._guard, self._is_shutdown, self._work_tracker),
executor=self)
with self._tasks_lock:
self._pending_tasks[task] = TaskData(
source_entity=entity,
source_node=node
)
return task

def can_execute(self, entity: 'Entity') -> bool:
Expand Down Expand Up @@ -675,21 +693,25 @@ def _wait_for_ready_callbacks(
nodes_to_use = self.get_nodes()

# Yield tasks in-progress before waiting for new work
tasks = None
with self._tasks_lock:
tasks = list(self._tasks)
# Tasks that need to be executed again will add themselves back to the executor
self._tasks = []
for task_trio in tasks:
task, entity, node = task_trio
# Get rid of any tasks that are done or cancelled
for task in list(self._pending_tasks.keys()):
if task.done() or task.cancelled():
del self._pending_tasks[task]

ready_tasks_count = len(self._ready_tasks)
for _ in range(ready_tasks_count):
task = self._ready_tasks.popleft()
task_data = self._pending_tasks[task]
node = task_data.source_node
if node is None or node in nodes_to_use:
entity = task_data.source_entity
yielded_work = True
yield task_trio
yield task, entity, node
else:
# Asked not to execute these tasks, so don't do them yet
with self._tasks_lock:
self._tasks.append(task_trio)

self._ready_tasks.append(task)
# Gather entities that can be waited on
subscriptions: List[Subscription[Any, ]] = []
guards: List[GuardCondition] = []
Expand Down