From 09fc152031b40bbc3540653c92a44b0ea78328e4 Mon Sep 17 00:00:00 2001 From: Hubert Liberacki Date: Wed, 30 Mar 2022 12:48:59 +0200 Subject: [PATCH] Replace spin_until_future_complete with spin_until_complete, add spin_for method * Deprecate spin_until_future_complete * Add spin_until_complete * Add spin_for method * Udpdate unit tests Signed-off-by: Hubert Liberacki --- rclpy/rclpy/__init__.py | 57 +++++++++++++++--- rclpy/rclpy/executors.py | 91 ++++++++++++++++++++++------ rclpy/test/test_action_client.py | 26 ++++---- rclpy/test/test_action_server.py | 54 ++++++++--------- rclpy/test/test_client.py | 6 +- rclpy/test/test_executor.py | 100 +++++++++++++++++++++++++++---- rclpy/test/test_qos_event.py | 2 +- rclpy/test/test_waitable.py | 2 +- 8 files changed, 259 insertions(+), 79 deletions(-) diff --git a/rclpy/rclpy/__init__.py b/rclpy/rclpy/__init__.py index cea1f9163..1a30d4b3b 100644 --- a/rclpy/rclpy/__init__.py +++ b/rclpy/rclpy/__init__.py @@ -33,7 +33,7 @@ After a node is created, items of work can be done (e.g. subscription callbacks) by *spinning* on the node. The following functions can be used to process work that is waiting to be executed: :func:`spin`, -:func:`spin_once`, and :func:`spin_until_future_complete`. +:func:`spin_once`, and :func:`spin_until_complete`. When finished with a previously initialized :class:`.Context` (ie. done using all ROS nodes associated with the context), the :func:`shutdown` function should be called. @@ -224,6 +224,52 @@ def spin(node: 'Node', executor: 'Executor' = None) -> None: executor.remove_node(node) +def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None: + """ + Execute block until the context associated with the executor is shutdown or time duration pass. + + Callbacks will be executed by the provided executor. + + This function blocks. + + :param node: A node to add to the executor to check for work. + :param executor: The executor to use, or the global executor if ``None``. + :param timeout_sec: Seconds to wait. + """ + executor = get_global_executor() if executor is None else executor + try: + executor.add_node(node) + executor.spin_once(duration_sec) + finally: + executor.remove_node(node) + + +def spin_until_complete( + node: 'Node', + condition, + executor: 'Executor' = None, + timeout_sec: float = None +) -> None: + """ + Execute work until the condition is complete. + + Callbacks and other work will be executed by the provided executor until ``condition()`` or + ``future.done()`` returns ``True`` or the context associated with the executor is shutdown. + + :param node: A node to add to the executor to check for work. + :param condition: The callable or future object to wait on. + :param executor: The executor to use, or the global executor if ``None``. + :param timeout_sec: Seconds to wait. Block until the condition is complete + if ``None`` or negative. Don't wait if 0. + """ + executor = get_global_executor() if executor is None else executor + try: + executor.add_node(node) + executor.spin_until_complete(condition, timeout_sec) + finally: + executor.remove_node(node) + + def spin_until_future_complete( node: 'Node', future: Future, @@ -241,10 +287,7 @@ def spin_until_future_complete( :param executor: The executor to use, or the global executor if ``None``. :param timeout_sec: Seconds to wait. Block until the future is complete if ``None`` or negative. Don't wait if 0. + + Deprecated in favor of spin_until_complete """ - executor = get_global_executor() if executor is None else executor - try: - executor.add_node(node) - executor.spin_until_future_complete(future, timeout_sec) - finally: - executor.remove_node(node) + spin_until_complete(node, future, executor, timeout_sec) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index dbaba882f..ac6576c87 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -151,7 +151,8 @@ def __init__(self, *, context: Context = None) -> None: self._nodes: Set[Node] = set() self._nodes_lock = RLock() # Tasks to be executed (oldest first) 3-tuple Task, Entity, Node - self._tasks: List[Tuple[Task, Optional[WaitableEntityType], Optional[Node]]] = [] + self._tasks: List[Tuple[Task, + Optional[WaitableEntityType], Optional[Node]]] = [] self._tasks_lock = Lock() # This is triggered when wait_for_ready_callbacks should rebuild the wait list self._guard = GuardCondition( @@ -278,21 +279,37 @@ def spin(self) -> None: while self._context.ok() and not self._is_shutdown: self.spin_once() - def spin_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: - """Execute callbacks until a given future is done or a timeout occurs.""" - # Make sure the future wakes this executor when it is done - future.add_done_callback(lambda x: self.wake()) + def spin_for(self, duration_sec: float = None) -> None: + """Execute callbacks until shutdown, or timeout.""" + self.spin_until_complete(lambda: False, duration_sec) + + def spin_until_complete(self, condition, timeout_sec: float = None) -> None: + """ + Execute callbacks until a given condition is done or a timeout occurs. + + Deprecated in favor of spin_until_complete. + """ + # Common conditon for safisfying both Callable and Future + finish_condition = None + if (isinstance(condition, Future)): + # Make sure the future wakes this executor when it is done + condition.add_done_callback(lambda x: self.wake()) + def finish_condition(): return condition.done() + elif (callable(condition)): + def finish_condition(): return condition() + else: + raise TypeError("Condition has to be of Future or Callable type") if timeout_sec is None or timeout_sec < 0: - while self._context.ok() and not future.done() and not self._is_shutdown: - self.spin_once_until_future_complete(future, timeout_sec) + while self._context.ok() and not finish_condition() and not self._is_shutdown: + self.spin_once_until_complete(condition, timeout_sec) else: start = time.monotonic() end = start + timeout_sec timeout_left = timeout_sec - while self._context.ok() and not future.done() and not self._is_shutdown: - self.spin_once_until_future_complete(future, timeout_left) + while self._context.ok() and not finish_condition() and not self._is_shutdown: + self.spin_once_until_complete(condition, timeout_left) now = time.monotonic() if now >= end: @@ -300,6 +317,14 @@ def spin_until_future_complete(self, future: Future, timeout_sec: float = None) timeout_left = end - now + def spin_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: + """ + Execute callbacks until a given future is done or a timeout occurs. + + Deprecated in favor of spin_until_complete. + """ + self.spin_until_complete(future, timeout_sec) + def spin_once(self, timeout_sec: float = None) -> None: """ Wait for and execute a single callback. @@ -311,6 +336,19 @@ def spin_once(self, timeout_sec: float = None) -> None: """ raise NotImplementedError() + def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None: + """ + Wait for and execute a single callback. + + This should behave in the same way as :meth:`spin_once`. + If needed by the implementation, it should awake other threads waiting. + + :param condition: The executor will wait until this condition is done. + :param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative. + Don't wait if 0. + """ + raise NotImplementedError() + def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: """ Wait for and execute a single callback. @@ -321,6 +359,8 @@ def spin_once_until_future_complete(self, future: Future, timeout_sec: float = N :param future: The executor will wait until this future is done. :param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative. Don't wait if 0. + + Deprecated in favor of spin_once_until_complete. """ raise NotImplementedError() @@ -361,7 +401,8 @@ async def _execute_client(self, client, seq_and_response): def _take_service(self, srv): with srv.handle: - request_and_header = srv.handle.service_take_request(srv.srv_type.Request) + request_and_header = srv.handle.service_take_request( + srv.srv_type.Request) return request_and_header async def _execute_service(self, srv, request_and_header): @@ -422,7 +463,8 @@ async def handler(entity, gc, is_shutdown, work_tracker): # callback group can get executed gc.trigger() task = Task( - handler, (entity, self._guard, self._is_shutdown, self._work_tracker), + handler, (entity, self._guard, + self._is_shutdown, self._work_tracker), executor=self) with self._tasks_lock: self._tasks.append((task, entity, node)) @@ -458,7 +500,8 @@ def _wait_for_ready_callbacks( timeout_timer = None timeout_nsec = timeout_sec_to_nsec(timeout_sec) if timeout_nsec > 0: - timeout_timer = Timer(None, None, timeout_nsec, self._clock, context=self._context) + timeout_timer = Timer(None, None, timeout_nsec, + self._clock, context=self._context) yielded_work = False while not yielded_work and not self._is_shutdown and not condition(): @@ -479,7 +522,8 @@ def _wait_for_ready_callbacks( yield task, entity, node with self._tasks_lock: # Get rid of any tasks that are done - self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks)) + self._tasks = list( + filter(lambda t_e_n: not t_e_n[0].done(), self._tasks)) # Gather entities that can be waited on subscriptions: List[Subscription] = [] @@ -489,7 +533,8 @@ def _wait_for_ready_callbacks( services: List[Service] = [] waitables: List[Waitable] = [] for node in nodes_to_use: - subscriptions.extend(filter(self.can_execute, node.subscriptions)) + subscriptions.extend( + filter(self.can_execute, node.subscriptions)) timers.extend(filter(self.can_execute, node.timers)) clients.extend(filter(self.can_execute, node.clients)) services.extend(filter(self.can_execute, node.services)) @@ -701,7 +746,8 @@ def __init__(self, *, context: Context = None) -> None: def spin_once(self, timeout_sec: float = None) -> None: try: - handler, entity, node = self.wait_for_ready_callbacks(timeout_sec=timeout_sec) + handler, entity, node = self.wait_for_ready_callbacks( + timeout_sec=timeout_sec) except ShutdownException: pass except TimeoutException: @@ -711,9 +757,14 @@ def spin_once(self, timeout_sec: float = None) -> None: if handler.exception() is not None: raise handler.exception() - def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: + def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None: self.spin_once(timeout_sec) + """Deprecated in favor of spin_once_until_complete""" + + def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: + self.spin_once_until_complete(timeout_sec) + class MultiThreadedExecutor(Executor): """ @@ -756,5 +807,11 @@ def _spin_once_impl( def spin_once(self, timeout_sec: float = None) -> None: self._spin_once_impl(timeout_sec) + def spin_once_until_complete(self, condition, timeout_sec: float = None) -> None: + self._spin_once_impl(timeout_sec, condition if callable( + condition) else condition.done) + + """Deprecated in favor of spin_once_until_complete""" + def spin_once_until_future_complete(self, future: Future, timeout_sec: float = None) -> None: - self._spin_once_impl(timeout_sec, future.done) + self.spin_once_until_complete(timeout_sec, future.done) diff --git a/rclpy/test/test_action_client.py b/rclpy/test/test_action_client.py index 8ae4a151c..ba4b817e9 100644 --- a/rclpy/test/test_action_client.py +++ b/rclpy/test/test_action_client.py @@ -159,7 +159,7 @@ def test_send_goal_async(self): try: self.assertTrue(ac.wait_for_server(timeout_sec=2.0)) future = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) self.assertTrue(future.done()) goal_handle = future.result() self.assertTrue(goal_handle.accepted) @@ -177,7 +177,7 @@ def test_send_goal_async_with_feedback_after_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Publish feedback after goal has been accepted self.mock_action_server.publish_feedback(goal_uuid) @@ -202,7 +202,7 @@ def test_send_goal_async_with_feedback_before_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Check the feedback was not received self.assertEqual(self.feedback, None) @@ -220,14 +220,14 @@ def test_send_goal_async_with_feedback_for_another_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=first_goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Send another goal, but without a feedback callback second_goal_uuid = UUID(uuid=list(uuid.uuid4().bytes)) future = ac.send_goal_async( Fibonacci.Goal(), goal_uuid=second_goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Publish feedback for the second goal self.mock_action_server.publish_feedback(second_goal_uuid) @@ -251,7 +251,7 @@ def test_send_goal_async_with_feedback_for_not_a_goal(self): Fibonacci.Goal(), feedback_callback=self.feedback_callback, goal_uuid=goal_uuid) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # Publish feedback for a non-existent goal ID self.mock_action_server.publish_feedback(UUID(uuid=list(uuid.uuid4().bytes))) @@ -272,9 +272,9 @@ def test_send_goal_multiple(self): future_0 = ac.send_goal_async(Fibonacci.Goal()) future_1 = ac.send_goal_async(Fibonacci.Goal()) future_2 = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, future_0, executor) - rclpy.spin_until_future_complete(self.node, future_1, executor) - rclpy.spin_until_future_complete(self.node, future_2, executor) + rclpy.spin_until_complete(self.node, future_0, executor) + rclpy.spin_until_complete(self.node, future_1, executor) + rclpy.spin_until_complete(self.node, future_2, executor) self.assertTrue(future_0.done()) self.assertTrue(future_1.done()) self.assertTrue(future_2.done()) @@ -300,13 +300,13 @@ def test_send_cancel_async(self): # Send a goal goal_future = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertTrue(goal_future.done()) goal_handle = goal_future.result() # Cancel the goal cancel_future = goal_handle.cancel_goal_async() - rclpy.spin_until_future_complete(self.node, cancel_future, self.executor) + rclpy.spin_until_complete(self.node, cancel_future, self.executor) self.assertTrue(cancel_future.done()) self.assertEqual( cancel_future.result().goals_canceling[0].goal_id, @@ -321,13 +321,13 @@ def test_get_result_async(self): # Send a goal goal_future = ac.send_goal_async(Fibonacci.Goal()) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertTrue(goal_future.done()) goal_handle = goal_future.result() # Get the goal result result_future = goal_handle.get_result_async() - rclpy.spin_until_future_complete(self.node, result_future, self.executor) + rclpy.spin_until_complete(self.node, result_future, self.executor) self.assertTrue(result_future.done()) finally: ac.destroy() diff --git a/rclpy/test/test_action_server.py b/rclpy/test/test_action_server.py index fa2af1e74..bfb1b6874 100644 --- a/rclpy/test/test_action_server.py +++ b/rclpy/test/test_action_server.py @@ -169,7 +169,7 @@ def handle_accepted_callback(goal_handle): goal_msg.goal_id = goal_uuid goal_msg.goal.order = goal_order future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) self.assertTrue(future.result().accepted) self.assertTrue(handle_accepted_callback_triggered) action_server.destroy() @@ -199,7 +199,7 @@ def handle_accepted_callback(goal_handle): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_msg.goal.order = goal_order future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) self.assertFalse(future.result().accepted) action_server.destroy() @@ -220,7 +220,7 @@ def goal_callback(goal): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future, self.executor) + rclpy.spin_until_complete(self.node, future, self.executor) # An invalid return type in the goal callback should translate to a rejected goal self.assertFalse(future.result().accepted) action_server.destroy() @@ -244,9 +244,9 @@ def test_multi_goal_accept(self): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) future2 = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future0, executor) - rclpy.spin_until_future_complete(self.node, future1, executor) - rclpy.spin_until_future_complete(self.node, future2, executor) + rclpy.spin_until_complete(self.node, future0, executor) + rclpy.spin_until_complete(self.node, future1, executor) + rclpy.spin_until_complete(self.node, future2, executor) self.assertTrue(future0.result().accepted) self.assertTrue(future1.result().accepted) @@ -270,8 +270,8 @@ def test_duplicate_goal(self): future0 = self.mock_action_client.send_goal(goal_msg) future1 = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, future0, executor) - rclpy.spin_until_future_complete(self.node, future1, executor) + rclpy.spin_until_complete(self.node, future0, executor) + rclpy.spin_until_complete(self.node, future1, executor) # Exactly one of the goals should be accepted self.assertNotEqual(future0.result().accepted, future1.result().accepted) @@ -306,7 +306,7 @@ def cancel_callback(request): goal_msg.goal_id = goal_uuid goal_msg.goal.order = goal_order goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, executor) + rclpy.spin_until_complete(self.node, goal_future, executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) @@ -315,7 +315,7 @@ def cancel_callback(request): cancel_srv.goal_info.stamp.sec = 0 cancel_srv.goal_info.stamp.nanosec = 0 cancel_future = self.mock_action_client.cancel_goal(cancel_srv) - rclpy.spin_until_future_complete(self.node, cancel_future, executor) + rclpy.spin_until_complete(self.node, cancel_future, executor) cancel_result = cancel_future.result() self.assertEqual(len(cancel_result.goals_canceling), 1) assert all(cancel_result.goals_canceling[0].goal_id.uuid == goal_uuid.uuid) @@ -352,7 +352,7 @@ def cancel_callback(request): goal_msg.goal_id = goal_uuid goal_msg.goal.order = goal_order goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, executor) + rclpy.spin_until_complete(self.node, goal_future, executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) @@ -361,7 +361,7 @@ def cancel_callback(request): cancel_srv.goal_info.stamp.sec = 0 cancel_srv.goal_info.stamp.nanosec = 0 cancel_future = self.mock_action_client.cancel_goal(cancel_srv) - rclpy.spin_until_future_complete(self.node, cancel_future, executor) + rclpy.spin_until_complete(self.node, cancel_future, executor) cancel_result = cancel_future.result() self.assertEqual(len(cancel_result.goals_canceling), 0) @@ -398,7 +398,7 @@ def execute_callback(gh): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) send_goal_response = goal_future.result() self.assertTrue(send_goal_response.accepted) self.assertIsNotNone(server_goal_handle) @@ -410,7 +410,7 @@ def execute_callback(gh): cancel_srv.goal_info.stamp.sec = 0 cancel_srv.goal_info.stamp.nanosec = 0 cancel_future = self.mock_action_client.cancel_goal(cancel_srv) - rclpy.spin_until_future_complete(self.node, cancel_future, self.executor) + rclpy.spin_until_complete(self.node, cancel_future, self.executor) cancel_result = cancel_future.result() self.assertEqual(len(cancel_result.goals_canceling), 1) @@ -421,7 +421,7 @@ def execute_callback(gh): # Get the result and exepect it to have canceled status get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result = get_result_future.result() self.assertEqual(result.status, GoalStatus.STATUS_CANCELED) self.assertEqual(server_goal_handle.status, GoalStatus.STATUS_CANCELED) @@ -447,12 +447,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() self.assertEqual(result_response.status, GoalStatus.STATUS_SUCCEEDED) self.assertEqual(result_response.result.sequence.tolist(), [1, 1, 2, 3, 5]) @@ -478,12 +478,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() self.assertEqual(result_response.status, GoalStatus.STATUS_ABORTED) self.assertEqual(result_response.result.sequence.tolist(), [1, 1, 2, 3, 5]) @@ -508,12 +508,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() # Goal status should default to STATUS_ABORTED self.assertEqual(result_response.status, GoalStatus.STATUS_ABORTED) @@ -537,12 +537,12 @@ def execute_callback(goal_handle): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = goal_uuid goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) goal_handle = goal_future.result() self.assertTrue(goal_handle.accepted) get_result_future = self.mock_action_client.get_result(goal_uuid) - rclpy.spin_until_future_complete(self.node, get_result_future, self.executor) + rclpy.spin_until_complete(self.node, get_result_future, self.executor) result_response = get_result_future.result() # Goal status should default to STATUS_ABORTED self.assertEqual(result_response.status, GoalStatus.STATUS_ABORTED) @@ -563,7 +563,7 @@ def test_expire_goals_none(self): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertEqual(1, len(action_server._goal_handles)) @@ -586,7 +586,7 @@ def test_expire_goals_one(self): goal_msg = Fibonacci.Impl.SendGoalService.Request() goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertEqual(1, len(action_server._goal_handles)) @@ -642,7 +642,7 @@ def execute_with_feedback(goal_handle): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete(self.node, goal_future, self.executor) + rclpy.spin_until_complete(self.node, goal_future, self.executor) self.assertIsNotNone(self.mock_action_client.feedback_msg) self.assertEqual( @@ -674,7 +674,7 @@ def execute_with_feedback(goal_handle): goal_msg.goal_id = UUID(uuid=list(uuid.uuid4().bytes)) goal_future = self.mock_action_client.send_goal(goal_msg) - rclpy.spin_until_future_complete( + rclpy.spin_until_complete( self.node, goal_future, self.executor) feedback_msg = self.mock_action_client.feedback_msg diff --git a/rclpy/test/test_client.py b/rclpy/test/test_client.py index 6846a0eee..fa33d1f3a 100644 --- a/rclpy/test/test_client.py +++ b/rclpy/test/test_client.py @@ -83,8 +83,8 @@ def test_concurrent_calls_to_service(self): future1 = cli.call_async(GetParameters.Request()) future2 = cli.call_async(GetParameters.Request()) executor = rclpy.executors.SingleThreadedExecutor(context=self.context) - rclpy.spin_until_future_complete(self.node, future1, executor=executor) - rclpy.spin_until_future_complete(self.node, future2, executor=executor) + rclpy.spin_until_complete(self.node, future1, executor=executor) + rclpy.spin_until_complete(self.node, future2, executor=executor) self.assertTrue(future1.result() is not None) self.assertTrue(future2.result() is not None) finally: @@ -132,7 +132,7 @@ def test_different_type_raises(self): future = cli.call_async(GetParameters.Request()) executor = rclpy.executors.SingleThreadedExecutor(context=self.context) with self.assertRaises(TypeError): - rclpy.spin_until_future_complete(self.node, future, executor=executor) + rclpy.spin_until_complete(self.node, future, executor=executor) finally: self.node.destroy_client(cli) self.node.destroy_service(srv) diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 9a10c6e4f..112977917 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -29,7 +29,8 @@ class TestExecutor(unittest.TestCase): def setUp(self): self.context = rclpy.context.Context() rclpy.init(context=self.context) - self.node = rclpy.create_node('TestExecutor', namespace='/rclpy', context=self.context) + self.node = rclpy.create_node( + 'TestExecutor', namespace='/rclpy', context=self.context) def tearDown(self): self.node.destroy_node() @@ -285,7 +286,8 @@ def spin_until_task_done(executor): break # Start spinning in a separate thread - thr = threading.Thread(target=spin_until_task_done, args=(executor, ), daemon=True) + thr = threading.Thread(target=spin_until_task_done, + args=(executor, ), daemon=True) thr.start() # Sleep in this thread to give the executor a chance to reach the loop in @@ -348,7 +350,23 @@ def test_executor_add_node(self): assert not executor.add_node(self.node) assert id(executor) == id(self.node.executor) - def test_executor_spin_until_future_complete_timeout(self): + def test_executor_spin_for(self): + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + executor.add_node(self.node) + + def timer_callback(): + pass + timer = self.node.create_timer(0.003, timer_callback) + + start = time.monotonic() + executor.spin_for(duration_sec=0.1) + end = time.monotonic() + self.assertGreaterEqual(end - start, 0.1) + + timer.cancel() + + def test_executor_spin_until_complete_timeout(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) executor.add_node(self.node) @@ -361,16 +379,68 @@ def timer_callback(): future = Future() self.assertFalse(future.done()) start = time.monotonic() - executor.spin_until_future_complete(future=future, timeout_sec=0.1) + executor.spin_until_complete(future, timeout_sec=0.1) end = time.monotonic() # Nothing is ever setting the future, so this should have waited # at least 0.1 seconds. self.assertGreaterEqual(end - start, 0.1) self.assertFalse(future.done()) + start = time.monotonic() + executor.spin_until_complete(lambda: False, timeout_sec=0.1) + end = time.monotonic() + # condition is always false so this should have waited + # at least 0.1 seconds. + self.assertGreaterEqual(end - start, 0.1) + self.assertFalse(future.done()) + + timer.cancel() + + def test_executor_spin_until_complete_condition_done(self): + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + executor.add_node(self.node) + + def timer_callback(): + pass + timer = self.node.create_timer(0.003, timer_callback) + + condition_var = False + + def set_condition(): + nonlocal condition_var + condition_var = True + + def condition(): + nonlocal condition_var + return condition_var + + # Condition complete timeout_sec > 0 + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=0.2) + self.assertTrue(condition()) + + # timeout_sec = None + condition_var = False + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=None) + self.assertTrue(condition()) + + # Condition complete timeout < 0 + condition_var = False + self.assertFalse(condition()) + t = threading.Thread(target=lambda: set_condition()) + t.start() + executor.spin_until_complete(condition, timeout_sec=-1) + self.assertTrue(condition()) + timer.cancel() - def test_executor_spin_until_future_complete_future_done(self): + def test_executor_spin_until_complete_future_done(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) executor.add_node(self.node) @@ -387,7 +457,7 @@ def set_future_result(future): self.assertFalse(future.done()) t = threading.Thread(target=lambda: set_future_result(future)) t.start() - executor.spin_until_future_complete(future=future, timeout_sec=0.2) + executor.spin_until_complete(future, timeout_sec=0.2) self.assertTrue(future.done()) self.assertEqual(future.result(), 'finished') @@ -396,7 +466,7 @@ def set_future_result(future): self.assertFalse(future.done()) t = threading.Thread(target=lambda: set_future_result(future)) t.start() - executor.spin_until_future_complete(future=future, timeout_sec=None) + executor.spin_until_complete(future, timeout_sec=None) self.assertTrue(future.done()) self.assertEqual(future.result(), 'finished') @@ -405,13 +475,13 @@ def set_future_result(future): self.assertFalse(future.done()) t = threading.Thread(target=lambda: set_future_result(future)) t.start() - executor.spin_until_future_complete(future=future, timeout_sec=-1) + executor.spin_until_complete(future, timeout_sec=-1) self.assertTrue(future.done()) self.assertEqual(future.result(), 'finished') timer.cancel() - def test_executor_spin_until_future_complete_do_not_wait(self): + def test_executor_spin_until_complete_do_not_wait(self): self.assertIsNotNone(self.node.handle) executor = SingleThreadedExecutor(context=self.context) executor.add_node(self.node) @@ -423,9 +493,19 @@ def timer_callback(): # Do not wait timeout_sec = 0 future = Future() self.assertFalse(future.done()) - executor.spin_until_future_complete(future=future, timeout_sec=0) + executor.spin_until_complete(future, timeout_sec=0) self.assertFalse(future.done()) + condition_var = False + + def condition(): + nonlocal condition_var + return condition_var + + self.assertFalse(condition()) + executor.spin_until_complete(condition, timeout_sec=0) + self.assertFalse(condition()) + timer.cancel() def test_executor_add_node_wakes_executor(self): diff --git a/rclpy/test/test_qos_event.py b/rclpy/test/test_qos_event.py index 4acfc1b90..39ecc12e8 100644 --- a/rclpy/test/test_qos_event.py +++ b/rclpy/test/test_qos_event.py @@ -175,7 +175,7 @@ def warn(self, message, once=False): EmptyMsg, self.topic_name, message_callback, qos_profile_subscription) executor = rclpy.executors.SingleThreadedExecutor(context=self.context) - rclpy.spin_until_future_complete(self.node, log_msgs_future, executor, 10.0) + rclpy.spin_until_complete(self.node, log_msgs_future, executor, 10.0) self.assertEqual( pub_log_msg, diff --git a/rclpy/test/test_waitable.py b/rclpy/test/test_waitable.py index f79841c51..cccdc3da1 100644 --- a/rclpy/test/test_waitable.py +++ b/rclpy/test/test_waitable.py @@ -298,7 +298,7 @@ def tearDownClass(cls): def start_spin_thread(self, waitable): waitable.future = Future(executor=self.executor) self.thr = threading.Thread( - target=self.executor.spin_until_future_complete, args=(waitable.future,), daemon=True) + target=self.executor.spin_until_complete, args=(waitable.future,), daemon=True) self.thr.start() return self.thr