Skip to content
Open
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
78c50be
Make pending() a public method of Future
bjsowa Jun 6, 2025
428e4e7
Use yielded future to determine whether task is ready to be resumed
bjsowa Jun 6, 2025
defad88
Use guard condition to wake up the executor when the task becomes ready
bjsowa Jun 6, 2025
19e9c0a
Add a test for resuming coroutine
bjsowa Jun 6, 2025
534acce
Add call_soon method to the executor
bjsowa Jul 9, 2025
23e334f
Revert changes to executors.py and task.py
bjsowa Jul 9, 2025
6695847
Make the tasks schedule itself to the executor when they can be resumed
bjsowa Jul 9, 2025
c6f729c
Fix flake8 error
bjsowa Jul 9, 2025
2a35478
Fix dependent coroutine test
bjsowa Jul 9, 2025
e39d310
Disable EventsExecutor tests that result in Segmentation fault
bjsowa Jul 9, 2025
458a73c
Don't schedule tasks when yielding handlers
bjsowa Jul 9, 2025
bde1872
Don't remove tasks not managed by this executor
bjsowa Jul 9, 2025
faafa26
Raise an error when future awaited a second time
bjsowa Jul 19, 2025
8c1bb36
Disable segfaulting test in EventsExecutor
bjsowa Jul 19, 2025
be207ce
Raise exception when resuming a task without an executor
bjsowa Jul 19, 2025
a79df94
Fix flake8 warnings
bjsowa Jul 19, 2025
6c19b8e
call_soon -> _call_task_in_next_spin
bjsowa Jul 19, 2025
aa5da61
Fix coroutine task requiring 2 spins to resume
bjsowa Jul 19, 2025
eab03e3
Merge remote-tracking branch 'origin/rolling' into fix/async-task-resume
bjsowa Aug 8, 2025
2eb1cfb
Move coroutine yield test to executor tests
bjsowa Aug 20, 2025
35be3cc
Assert elapsed time in coroutine thread wakeup test
bjsowa Aug 20, 2025
d88d94e
Remove unused imports
bjsowa Aug 20, 2025
44b0f62
Merge remote-tracking branch 'origin/rolling' into fix/async-task-resume
bjsowa Sep 6, 2025
bf46550
Set future executor if awaiting a future without one and warn users
bjsowa Sep 7, 2025
91890f3
Use get_logger function
bjsowa Sep 7, 2025
e3dd676
Fix return type of Future.__await__
bjsowa Sep 7, 2025
98a9b7a
Move coroutine yield logic out of try block, add helper method
bjsowa Sep 7, 2025
bab3ba9
Update coroutine handling exception messages
bjsowa Sep 7, 2025
e0f21bf
Use DeprecationWarning instead of logging
bjsowa Sep 7, 2025
a0d5592
Remove redundant comment
bjsowa Sep 7, 2025
5a99757
Remove deprecation warning
bjsowa Sep 9, 2025
d8db620
Merge remote-tracking branch 'origin/rolling' into fix/async-task-resume
bjsowa Sep 12, 2025
5c0408e
Use executor.create_future() in test
bjsowa Sep 12, 2025
991afa0
Add call_task_in_next_spin to EventsExecutor
bjsowa Sep 13, 2025
d27bf64
Reenable EventsExecutor tests
bjsowa Sep 13, 2025
dfa0904
Fix new task handling in EventsExecutor
nadavelkabets Sep 13, 2025
23f06e0
Remove unnecessary whitespace
bjsowa Sep 14, 2025
cf67589
Enable wake_from_another_thread test for EventsExecutor
bjsowa Sep 14, 2025
9771f6f
Feature: store source entity and node info for pending tasks (#2)
nadavelkabets Oct 13, 2025
5ce6fbd
Fix flake8 warnings
bjsowa Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,19 @@ def create_task(self, callback: Callable[..., Any], *args: Any, **kwargs: Any
:param callback: A callback to be run in the executor.
"""
task = Task(callback, args, kwargs, executor=self)
self._call_task_in_next_spin(task)
return task

def _call_task_in_next_spin(self, task: Task) -> None:
"""
Add a task to the executor to be executed in the next spin.

:param task: A task to be run in the executor.
"""
with self._tasks_lock:
self._tasks.append((task, None, None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now realize that we still have some topics to discuss.
We are changing behavior here - previously, the executor always yielded node and entity, even for blocked tasks. Now, any task that is resumed using _call_task_in_next_spin will yield None for entity and node from the second time onwards.
SingleThreadedExecutor and MultiThreadedExecutor ignore these yielded values, but the node argument is used to filter tasks. I believe that unlike before, the new code will keep running tasks created by a removed node for example.

            for task_trio in tasks:
                task, entity, node = task_trio
                if node is None or node in nodes_to_use:

We should discuss this behavior change further before proceeding.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see any way this can be fixed?

Copy link
Contributor

@nadavelkabets nadavelkabets Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that depends on the intended behavior.
If we are okay with the "set and forget" approach, no change is required.
Otherwise, for each node in the executor we can hold a set of running tasks.
When a node is removed from the executor, all tasks related to it should be cancelled.
It's also possible to initialize a task with the node that it originated from as a property instead of this task_trio.

Copy link
Author

@bjsowa bjsowa Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that holding tuples of (task, entity, node) does not really make sense now as the entity and node information is never added after I removed it from _make_handler. Maybe we should find some other way to track which node created the task?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nadavelkabets any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that holding tuples of (task, entity, node) does not really make sense now as the entity and node information is never added after I removed it from _make_handler.

Exactly. I would like to discuss this topic with core maintainers to get their opinion.
I'm not sure if we should avoid changing this behavior or if this is a welcomed change.

if self._guard:
self._guard.trigger()
# Task inherits from Future
return task

def create_future(self) -> Future:
"""Create a Future object attached to the Executor."""
Expand Down Expand Up @@ -624,8 +631,6 @@ async def handler(entity: 'EntityT', gc: GuardCondition, is_shutdown: bool,
task: Task[None] = Task(
handler, (entity, self._guard, self._is_shutdown, self._work_tracker),
executor=self)
with self._tasks_lock:
self._tasks.append((task, entity, node))
return task

def can_execute(self, entity: 'Entity') -> bool:
Expand Down Expand Up @@ -673,17 +678,17 @@ def _wait_for_ready_callbacks(
tasks = None
with self._tasks_lock:
tasks = list(self._tasks)
if tasks:
for task, entity, node in tasks:
if (not task.executing() and not task.done() and
(node is None or node in nodes_to_use)):
yielded_work = True
yield task, entity, node
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))
# Get rid of any tasks that are cancelled
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].cancelled(), self._tasks))
# Tasks that need to be executed again will add themselves back to the executor
self._tasks = []
for task_trio in tasks:
task, entity, node = task_trio
if node is None or node in nodes_to_use:
yielded_work = True
yield task_trio
else:
# Asked not to execute these tasks, so don't do them yet
with self._tasks_lock:
self._tasks.append(task_trio)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maintain this behavior if the node is not in nodes_to_use?
We get this infinity loop that occurs in the following scenario

  • A task created by an entity callback is awaiting something
  • The node of the task is removed from the executor

In this case we will keep removing and adding the task forever.

Copy link
Author

@bjsowa bjsowa Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could change it to something like this:

# Yield tasks in-progress before waiting for new work
with self._tasks_lock:
    # Retrieve a list of tasks that can be executed now
    tasks = [
        (task, entity, node) for (task, entity, node) in self._tasks
        if node is None or node in nodes_to_use
    ]
    # Remove tasks that are going to be executed now from the executor
    # Tasks that need to be executed again will add themselves back to the executor
    self._tasks = [
        (task, entity, node) for (task, entity, node) in self._tasks
        if node is not None and node not in nodes_to_use
    ]

yield from tasks

Not much of an optimization but imo looks cleaner and we don't remove and add the same tasks.


# Gather entities that can be waited on
subscriptions: List[Subscription[Any, ]] = []
Expand Down
62 changes: 48 additions & 14 deletions rclpy/rclpy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ def __del__(self) -> None:
'The following exception was never retrieved: ' + str(self._exception),
file=sys.stderr)

def __await__(self) -> Generator[None, None, Optional[T]]:
def __await__(self) -> Generator['Future[T]', None, Optional[T]]:
# Yield if the task is not finished
while self._pending():
yield
if self._pending():
# This tells the task to suspend until the future is done
yield self
if self._pending():
raise RuntimeError('Future awaited a second time before it was done')
return self.result()

def _pending(self) -> bool:
Expand Down Expand Up @@ -298,17 +301,7 @@ def __call__(self) -> None:
self._executing = True

if inspect.iscoroutine(self._handler):
# Execute a coroutine
handler = self._handler
try:
handler.send(None)
except StopIteration as e:
# The coroutine finished; store the result
self.set_result(e.value)
self._complete_task()
except Exception as e:
self.set_exception(e)
self._complete_task()
self._execute_coroutine_step(self._handler)
else:
# Execute a normal function
try:
Expand All @@ -322,6 +315,47 @@ def __call__(self) -> None:
finally:
self._task_lock.release()

def _execute_coroutine_step(self, coro: Coroutine[Any, Any, T]) -> None:
"""Execute or resume a coroutine task."""
try:
result = coro.send(None)
except StopIteration as e:
# The coroutine finished; store the result
self.set_result(e.value)
self._complete_task()
except Exception as e:
# The coroutine raised; store the exception
self.set_exception(e)
self._complete_task()
else:
# The coroutine yielded; suspend the task until it is resumed
executor = self._executor()
if executor is None:
raise RuntimeError(
'Task tried to reschedule but no executor was set: '
'tasks should only be initialized through executor.create_task()')
elif isinstance(result, Future):
# Schedule the task to resume when the future is done
self._add_resume_callback(result, executor)
elif result is None:
# The coroutine yielded None, schedule the task to resume in the next spin
executor._call_task_in_next_spin(self)
else:
raise TypeError(
f'Expected coroutine to yield a Future or None, got: {type(result)}')

def _add_resume_callback(self, future: Future[T], executor: 'Executor') -> None:
future_executor = future._executor()
if future_executor is None:
# The future is not associated with an executor yet, so associate it with ours
future._set_executor(executor)
elif future_executor is not executor:
raise RuntimeError('A task can only await futures associated with the same executor')

# The future is associated with the same executor, so we can resume the task directly
# in the done callback
future.add_done_callback(lambda _: self.__call__())

This comment was marked as outdated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow. At the time the coroutine yields a future, we know that this future is still pending, so we tell the future to schedule the next call of the task to the executor when the future either finishes or is cancelled. A cancelled future will just return None in the next send


def _complete_task(self) -> None:
"""Cleanup after task finished."""
self._handler = None
Expand Down
38 changes: 7 additions & 31 deletions rclpy/src/rclpy/events_executor/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,15 @@ pybind11::object EventsExecutor::create_task(
// manual refcounting on it instead.
py::handle cb_task_handle = task;
cb_task_handle.inc_ref();
events_queue_.Enqueue(std::bind(&EventsExecutor::IterateTask, this, cb_task_handle));
call_task_in_next_spin(task);
return task;
}

void EventsExecutor::call_task_in_next_spin(pybind11::handle task)
{
events_queue_.Enqueue(std::bind(&EventsExecutor::IterateTask, this, task));
}

pybind11::object EventsExecutor::create_future()
{
using py::literals::operator""_a;
Expand Down Expand Up @@ -164,8 +169,6 @@ void EventsExecutor::spin(std::optional<double> timeout_sec, bool stop_after_use
throw std::runtime_error("Attempt to spin an already-spinning Executor");
}
stop_after_user_callback_ = stop_after_user_callback;
// Any blocked tasks may have become unblocked while we weren't looking.
PostOutstandingTasks();
// Release the GIL while we block. Any callbacks on the events queue that want to touch Python
// will need to reacquire it though.
py::gil_scoped_release gil_release;
Expand Down Expand Up @@ -347,8 +350,6 @@ void EventsExecutor::HandleSubscriptionReady(py::handle subscription, size_t num
got_none = true;
}
}

PostOutstandingTasks();
}

void EventsExecutor::HandleAddedTimer(py::handle timer) {timers_manager_.AddTimer(timer);}
Expand Down Expand Up @@ -390,7 +391,6 @@ void EventsExecutor::HandleTimerReady(py::handle timer, const rcl_timer_call_inf
} else if (stop_after_user_callback_) {
events_queue_.Stop();
}
PostOutstandingTasks();
}

void EventsExecutor::HandleAddedClient(py::handle client)
Expand Down Expand Up @@ -461,8 +461,6 @@ void EventsExecutor::HandleClientReady(py::handle client, size_t number_of_event
}
}
}

PostOutstandingTasks();
}

void EventsExecutor::HandleAddedService(py::handle service)
Expand Down Expand Up @@ -525,8 +523,6 @@ void EventsExecutor::HandleServiceReady(py::handle service, size_t number_of_eve
send_response(response, header);
}
}

PostOutstandingTasks();
}

void EventsExecutor::HandleAddedWaitable(py::handle waitable)
Expand Down Expand Up @@ -792,8 +788,6 @@ void EventsExecutor::HandleWaitableReady(
// execute() is an async method, we need a Task to run it
create_task(execute(data));
}

PostOutstandingTasks();
}

void EventsExecutor::IterateTask(py::handle task)
Expand Down Expand Up @@ -826,26 +820,7 @@ void EventsExecutor::IterateTask(py::handle task)
throw;
}
}
} else {
// Task needs more iteration. Store the handle and revisit it later after the next ready
// entity which may unblock it.
// TODO(bmartin427) This matches the behavior of SingleThreadedExecutor and avoids busy
// looping, but I don't love it because if the task is waiting on something other than an rcl
// entity (e.g. an asyncio sleep, or a Future triggered from another thread, or even another
// Task), there can be arbitrarily long latency before some rcl activity causes us to go
// revisit that Task.
blocked_tasks_.push_back(task);
}
}

void EventsExecutor::PostOutstandingTasks()
{
for (auto & task : blocked_tasks_) {
events_queue_.Enqueue(std::bind(&EventsExecutor::IterateTask, this, task));
}
// Clear the entire outstanding tasks list. Any tasks that need further iteration will re-add
// themselves during IterateTask().
blocked_tasks_.clear();
}

void EventsExecutor::HandleCallbackExceptionInNodeEntity(
Expand Down Expand Up @@ -904,6 +879,7 @@ void define_events_executor(py::object module)
.def(py::init<py::object>(), py::arg("context"))
.def_property_readonly("context", &EventsExecutor::get_context)
.def("create_task", &EventsExecutor::create_task, py::arg("callback"))
.def("_call_task_in_next_spin", &EventsExecutor::call_task_in_next_spin, py::arg("task"))
.def("create_future", &EventsExecutor::create_future)
.def("shutdown", &EventsExecutor::shutdown, py::arg("timeout_sec") = py::none())
.def("add_node", &EventsExecutor::add_node, py::arg("node"))
Expand Down
9 changes: 1 addition & 8 deletions rclpy/src/rclpy/events_executor/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class EventsExecutor
pybind11::object get_context() const {return rclpy_context_;}
pybind11::object create_task(
pybind11::object callback, pybind11::args args = {}, const pybind11::kwargs & kwargs = {});
void call_task_in_next_spin(pybind11::handle task);
pybind11::object create_future();
bool shutdown(std::optional<double> timeout_sec = {});
bool add_node(pybind11::object node);
Expand Down Expand Up @@ -149,11 +150,6 @@ class EventsExecutor
/// create_task() implementation for details.
void IterateTask(pybind11::handle task);

/// Posts a call to IterateTask() for every outstanding entry in tasks_; should be invoked from
/// other Handle*Ready() methods to check if any asynchronous Tasks have been unblocked by the
/// newly-handled event.
void PostOutstandingTasks();

void HandleCallbackExceptionInNodeEntity(
const pybind11::error_already_set &, pybind11::handle entity,
const std::string & node_entity_attr);
Expand Down Expand Up @@ -190,9 +186,6 @@ class EventsExecutor
pybind11::set services_;
pybind11::set waitables_;

/// Collection of asynchronous Tasks awaiting new events to further iterate.
std::vector<pybind11::handle> blocked_tasks_;

/// Cache for rcl pointers underlying each waitables_ entry, because those are harder to retrieve
/// than the other entity types.
std::unordered_map<pybind11::handle, WaitableSubEntities, PythonHasher,
Expand Down
66 changes: 66 additions & 0 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,40 @@ async def coroutine() -> str:
self.assertTrue(future.done())
self.assertEqual('Sentinel Result', future.result())

def test_create_task_coroutine_yield(self) -> None:
self.assertIsNotNone(self.node.handle)
for cls in [SingleThreadedExecutor, EventsExecutor]:
with self.subTest(cls=cls):
executor = cls(context=self.context)
executor.add_node(self.node)

called1 = False
called2 = False

async def coroutine() -> str:
nonlocal called1
nonlocal called2
called1 = True
await asyncio.sleep(0)
called2 = True
return 'Sentinel Result'

future = executor.create_task(coroutine)
self.assertFalse(future.done())
self.assertFalse(called1)
self.assertFalse(called2)

executor.spin_once(timeout_sec=0)
self.assertFalse(future.done())
self.assertTrue(called1)
self.assertFalse(called2)

executor.spin_once(timeout_sec=1)

This comment was marked as outdated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not

self.assertTrue(future.done())
self.assertTrue(called1)
self.assertTrue(called2)
self.assertEqual('Sentinel Result', future.result())

def test_create_task_coroutine_cancel(self) -> None:
self.assertIsNotNone(self.node.handle)
for cls in [SingleThreadedExecutor, EventsExecutor]:
Expand All @@ -319,6 +353,38 @@ async def coroutine() -> str:
self.assertTrue(future.cancelled())
self.assertEqual(None, future.result())

def test_create_task_coroutine_wake_from_another_thread(self) -> None:
self.assertIsNotNone(self.node.handle)

for cls in [SingleThreadedExecutor, MultiThreadedExecutor, EventsExecutor]:
with self.subTest(cls=cls):
executor = cls(context=self.context)
thread_future = executor.create_future()

async def coroutine():
await thread_future

def future_thread():
threading.Event().wait(0.1) # Simulate some work
thread_future.set_result(None)

t = threading.Thread(target=future_thread)

coroutine_future = executor.create_task(coroutine)

start_time = time.monotonic()

t.start()
executor.spin_until_future_complete(coroutine_future, timeout_sec=1.0)

end_time = time.monotonic()

self.assertTrue(coroutine_future.done())

# The coroutine should take at least 0.1 seconds to complete because it waits for
# the thread to set the future but nowhere near the 1 second timeout
assert 0.1 <= end_time - start_time < 0.2

def test_create_task_normal_function(self) -> None:
self.assertIsNotNone(self.node.handle)
for cls in [SingleThreadedExecutor, EventsExecutor]:
Expand Down
25 changes: 0 additions & 25 deletions rclpy/test/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from typing import Any
from typing import Callable
from typing import List
Expand Down Expand Up @@ -54,30 +53,6 @@ def func() -> str:
self.assertTrue(t.done())
self.assertEqual('Sentinel Result', t.result())

def test_coroutine(self) -> None:
called1 = False
called2 = False

async def coro() -> str:
nonlocal called1
nonlocal called2
called1 = True
await asyncio.sleep(0)
called2 = True
return 'Sentinel Result'

t = Task(coro)
t()
self.assertTrue(called1)
self.assertFalse(called2)

called1 = False
t()
self.assertFalse(called1)
self.assertTrue(called2)
self.assertTrue(t.done())
self.assertEqual('Sentinel Result', t.result())

def test_done_callback_scheduled(self) -> None:
executor = DummyExecutor()

Expand Down