-
Notifications
You must be signed in to change notification settings - Fork 260
Fix issues with resuming async tasks awaiting a future #1469
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: rolling
Are you sure you want to change the base?
Changes from all commits
78c50be
428e4e7
defad88
19e9c0a
534acce
23e334f
6695847
c6f729c
2a35478
e39d310
458a73c
bde1872
faafa26
8c1bb36
be207ce
a79df94
6c19b8e
aa5da61
eab03e3
2eb1cfb
35be3cc
d88d94e
44b0f62
bf46550
91890f3
e3dd676
98a9b7a
bab3ba9
e0f21bf
a0d5592
5a99757
d8db620
5c0408e
991afa0
d27bf64
dfa0904
23f06e0
cf67589
9771f6f
5ce6fbd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,10 +66,13 @@ def __del__(self) -> None: | |
| 'The following exception was never retrieved: ' + str(self._exception), | ||
| file=sys.stderr) | ||
|
|
||
| def __await__(self) -> Generator[None, None, Optional[T]]: | ||
| def __await__(self) -> Generator['Future[T]', None, Optional[T]]: | ||
| # Yield if the task is not finished | ||
| while self._pending(): | ||
| yield | ||
| if self._pending(): | ||
| # This tells the task to suspend until the future is done | ||
| yield self | ||
| if self._pending(): | ||
| raise RuntimeError('Future awaited a second time before it was done') | ||
| return self.result() | ||
|
|
||
| def _pending(self) -> bool: | ||
|
|
@@ -298,17 +301,7 @@ def __call__(self) -> None: | |
| self._executing = True | ||
|
|
||
| if inspect.iscoroutine(self._handler): | ||
| # Execute a coroutine | ||
| handler = self._handler | ||
| try: | ||
| handler.send(None) | ||
| except StopIteration as e: | ||
| # The coroutine finished; store the result | ||
| self.set_result(e.value) | ||
| self._complete_task() | ||
| except Exception as e: | ||
| self.set_exception(e) | ||
| self._complete_task() | ||
| self._execute_coroutine_step(self._handler) | ||
| else: | ||
| # Execute a normal function | ||
| try: | ||
|
|
@@ -322,6 +315,47 @@ def __call__(self) -> None: | |
| finally: | ||
| self._task_lock.release() | ||
|
|
||
| def _execute_coroutine_step(self, coro: Coroutine[Any, Any, T]) -> None: | ||
| """Execute or resume a coroutine task.""" | ||
| try: | ||
| result = coro.send(None) | ||
| except StopIteration as e: | ||
| # The coroutine finished; store the result | ||
| self.set_result(e.value) | ||
| self._complete_task() | ||
| except Exception as e: | ||
| # The coroutine raised; store the exception | ||
| self.set_exception(e) | ||
| self._complete_task() | ||
| else: | ||
| # The coroutine yielded; suspend the task until it is resumed | ||
| executor = self._executor() | ||
| if executor is None: | ||
| raise RuntimeError( | ||
| 'Task tried to reschedule but no executor was set: ' | ||
| 'tasks should only be initialized through executor.create_task()') | ||
| elif isinstance(result, Future): | ||
| # Schedule the task to resume when the future is done | ||
| self._add_resume_callback(result, executor) | ||
| elif result is None: | ||
| # The coroutine yielded None, schedule the task to resume in the next spin | ||
| executor._call_task_in_next_spin(self) | ||
| else: | ||
| raise TypeError( | ||
| f'Expected coroutine to yield a Future or None, got: {type(result)}') | ||
|
|
||
| def _add_resume_callback(self, future: Future[T], executor: 'Executor') -> None: | ||
| future_executor = future._executor() | ||
| if future_executor is None: | ||
| # The future is not associated with an executor yet, so associate it with ours | ||
| future._set_executor(executor) | ||
| elif future_executor is not executor: | ||
| raise RuntimeError('A task can only await futures associated with the same executor') | ||
|
|
||
| # The future is associated with the same executor, so we can resume the task directly | ||
| # in the done callback | ||
| future.add_done_callback(lambda _: self.__call__()) | ||
This comment was marked as outdated.
Sorry, something went wrong.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I follow. At the time the coroutine yields a future, we know that this future is still pending, so we tell the future to schedule the next call of the task to the executor when the future either finishes or is cancelled. A cancelled future will just return None in the next |
||
|
|
||
| def _complete_task(self) -> None: | ||
| """Cleanup after task finished.""" | ||
| self._handler = None | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.