From 25b7c234976d41096399f8fe45e2658ce47f0cda Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Fri, 25 Oct 2024 19:48:29 +0800 Subject: [PATCH 1/2] Avoid redundant done callbacks of the future while repeatedly calling spin_until_future_complete (#1374) Signed-off-by: Barry Xu (cherry picked from commit c009b0de286101da1caedef860b88f1520edb97b) # Conflicts: # rclpy/rclpy/executors.py --- rclpy/rclpy/executors.py | 57 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index fd097a4f2..3893a77b9 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -321,6 +321,7 @@ def spin_until_future_complete( future.add_done_callback(lambda x: self.wake()) if timeout_sec is None or timeout_sec < 0: +<<<<<<< HEAD while ( self._context.ok() and not future.done() @@ -328,11 +329,16 @@ def spin_until_future_complete( and not self._is_shutdown ): self.spin_once_until_future_complete(future, timeout_sec) +======= + while self._context.ok() and not future.done() and not self._is_shutdown: + self._spin_once_until_future_complete(future, timeout_sec) +>>>>>>> c009b0d (Avoid redundant done callbacks of the future while repeatedly calling spin_until_future_complete (#1374)) else: start = time.monotonic() end = start + timeout_sec timeout_left = TimeoutObject(timeout_sec) +<<<<<<< HEAD while ( self._context.ok() and not future.done() @@ -340,6 +346,10 @@ def spin_until_future_complete( and not self._is_shutdown ): self.spin_once_until_future_complete(future, timeout_left) +======= + while self._context.ok() and not future.done() and not self._is_shutdown: + self._spin_once_until_future_complete(future, timeout_left) +>>>>>>> c009b0d (Avoid redundant done callbacks of the future while repeatedly calling spin_until_future_complete (#1374)) now = time.monotonic() if now >= end: @@ -377,6 +387,13 @@ def spin_once_until_future_complete( """ raise NotImplementedError() + def _spin_once_until_future_complete( + self, + future: Future, + timeout_sec: Optional[Union[float, TimeoutObject]] = None + ) -> None: + raise NotImplementedError() + def _take_timer(self, tmr): try: with tmr.handle: @@ -844,13 +861,20 @@ def _spin_once_impl( def spin_once(self, timeout_sec: Optional[float] = None) -> None: self._spin_once_impl(timeout_sec) + def _spin_once_until_future_complete( + self, + future: Future, + timeout_sec: Optional[Union[float, TimeoutObject]] = None + ) -> None: + self._spin_once_impl(timeout_sec, future.done) + def spin_once_until_future_complete( self, future: Future, timeout_sec: Optional[Union[float, TimeoutObject]] = None ) -> None: future.add_done_callback(lambda x: self.wake()) - self._spin_once_impl(timeout_sec, future.done) + self._spin_once_until_future_complete(future, timeout_sec) class MultiThreadedExecutor(Executor): @@ -916,10 +940,41 @@ def _spin_once_impl( def spin_once(self, timeout_sec: Optional[float] = None) -> None: self._spin_once_impl(timeout_sec) + def _spin_once_until_future_complete( + self, + future: Future, + timeout_sec: Optional[Union[float, TimeoutObject]] = None + ) -> None: + self._spin_once_impl(timeout_sec, future.done) + def spin_once_until_future_complete( self, future: Future, timeout_sec: Optional[Union[float, TimeoutObject]] = None ) -> None: future.add_done_callback(lambda x: self.wake()) +<<<<<<< HEAD self._spin_once_impl(timeout_sec, future.done) +======= + self._spin_once_until_future_complete(future, timeout_sec) + + def shutdown( + self, + timeout_sec: float = None, + *, + wait_for_threads: bool = True + ) -> bool: + """ + Stop executing callbacks and wait for their completion. + + :param timeout_sec: Seconds to wait. Block forever if ``None`` or negative. + Don't wait if 0. + :param wait_for_threads: If true, this function will block until all executor threads + have joined. + :return: ``True`` if all outstanding callbacks finished executing, or ``False`` if the + timeout expires before all outstanding work is done. + """ + success: bool = super().shutdown(timeout_sec) + self._executor.shutdown(wait=wait_for_threads) + return success +>>>>>>> c009b0d (Avoid redundant done callbacks of the future while repeatedly calling spin_until_future_complete (#1374)) From 8aaae46569e652f4fed9d434a52b47ff7d08b54d Mon Sep 17 00:00:00 2001 From: Tomoya Fujita Date: Wed, 22 Oct 2025 15:12:01 +0900 Subject: [PATCH 2/2] resolve conflicts. Signed-off-by: Tomoya Fujita --- rclpy/rclpy/executors.py | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 3893a77b9..3e099d8fa 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -321,35 +321,25 @@ def spin_until_future_complete( future.add_done_callback(lambda x: self.wake()) if timeout_sec is None or timeout_sec < 0: -<<<<<<< HEAD while ( self._context.ok() and not future.done() and not future.cancelled() and not self._is_shutdown ): - self.spin_once_until_future_complete(future, timeout_sec) -======= - while self._context.ok() and not future.done() and not self._is_shutdown: self._spin_once_until_future_complete(future, timeout_sec) ->>>>>>> c009b0d (Avoid redundant done callbacks of the future while repeatedly calling spin_until_future_complete (#1374)) else: start = time.monotonic() end = start + timeout_sec timeout_left = TimeoutObject(timeout_sec) -<<<<<<< HEAD while ( self._context.ok() and not future.done() and not future.cancelled() and not self._is_shutdown ): - self.spin_once_until_future_complete(future, timeout_left) -======= - while self._context.ok() and not future.done() and not self._is_shutdown: self._spin_once_until_future_complete(future, timeout_left) ->>>>>>> c009b0d (Avoid redundant done callbacks of the future while repeatedly calling spin_until_future_complete (#1374)) now = time.monotonic() if now >= end: @@ -953,28 +943,4 @@ def spin_once_until_future_complete( timeout_sec: Optional[Union[float, TimeoutObject]] = None ) -> None: future.add_done_callback(lambda x: self.wake()) -<<<<<<< HEAD - self._spin_once_impl(timeout_sec, future.done) -======= self._spin_once_until_future_complete(future, timeout_sec) - - def shutdown( - self, - timeout_sec: float = None, - *, - wait_for_threads: bool = True - ) -> bool: - """ - Stop executing callbacks and wait for their completion. - - :param timeout_sec: Seconds to wait. Block forever if ``None`` or negative. - Don't wait if 0. - :param wait_for_threads: If true, this function will block until all executor threads - have joined. - :return: ``True`` if all outstanding callbacks finished executing, or ``False`` if the - timeout expires before all outstanding work is done. - """ - success: bool = super().shutdown(timeout_sec) - self._executor.shutdown(wait=wait_for_threads) - return success ->>>>>>> c009b0d (Avoid redundant done callbacks of the future while repeatedly calling spin_until_future_complete (#1374))