11from __future__ import annotations
22
3+ import sys
4+ import asyncio
35import functools
4- from typing import TypeVar , Callable , Awaitable
6+ import contextvars
7+ from typing import Any , TypeVar , Callable , Awaitable
58from typing_extensions import ParamSpec
69
7- import anyio
8- import anyio .to_thread
9-
10- from ._reflection import function_has_argument
11-
1210T_Retval = TypeVar ("T_Retval" )
1311T_ParamSpec = ParamSpec ("T_ParamSpec" )
1412
1513
16- # copied from `asyncer`, https://github.com/tiangolo/asyncer
17- def asyncify (
18- function : Callable [T_ParamSpec , T_Retval ],
19- * ,
20- cancellable : bool = False ,
21- limiter : anyio .CapacityLimiter | None = None ,
22- ) -> Callable [T_ParamSpec , Awaitable [T_Retval ]]:
23- """
24- Take a blocking function and create an async one that receives the same
25- positional and keyword arguments, and that when called, calls the original function
26- in a worker thread using `anyio.to_thread.run_sync()`. Internally,
27- `asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports
28- keyword arguments additional to positional arguments and it adds better support for
29- autocompletion and inline errors for the arguments of the function called and the
30- return value.
14+ if sys .version_info >= (3 , 9 ):
15+ to_thread = asyncio .to_thread
16+ else :
17+ async def _to_thread (
18+ func : Callable [T_ParamSpec , T_Retval ], / , * args : T_ParamSpec .args , ** kwargs : T_ParamSpec .kwargs
19+ ) -> Any :
20+ """Asynchronously run function *func* in a separate thread.
3121
32- If the `cancellable` option is enabled and the task waiting for its completion is
33- cancelled, the thread will still run its course but its return value (or any raised
34- exception) will be ignored.
22+ Any *args and **kwargs supplied for this function are directly passed
23+ to *func*. Also, the current :class:`contextvars.Context` is propagated,
24+ allowing context variables from the main thread to be accessed in the
25+ separate thread.
3526
36- Use it like this:
27+ Returns a coroutine that can be awaited to get the eventual result of *func*.
28+ """
29+ loop = asyncio .events .get_running_loop ()
30+ ctx = contextvars .copy_context ()
31+ func_call = functools .partial (ctx .run , func , * args , ** kwargs )
32+ return await loop .run_in_executor (None , func_call )
33+
34+ to_thread = _to_thread
35+
36+ # inspired by `asyncer`, https://github.com/tiangolo/asyncer
37+ def asyncify (function : Callable [T_ParamSpec , T_Retval ]) -> Callable [T_ParamSpec , Awaitable [T_Retval ]]:
38+ """
39+ Take a blocking function and create an async one that receives the same
40+ positional and keyword arguments. For python version 3.9 and above, it uses
41+ asyncio.to_thread to run the function in a separate thread. For python version
42+ 3.8, it uses locally defined copy of the asyncio.to_thread function which was
43+ introduced in python 3.9.
3744
38- ```Python
39- def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
40- # Do work
41- return "Some result"
45+ Usage:
4246
47+ ```python
48+ def blocking_func(arg1, arg2, kwarg1=None):
49+ # blocking code
50+ return result
4351
44- result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
45- print(result)
52+ result = asyncify(blocking_function)(arg1, arg2, kwarg1=value1)
4653 ```
4754
4855 ## Arguments
4956
5057 `function`: a blocking regular callable (e.g. a function)
51- `cancellable`: `True` to allow cancellation of the operation
52- `limiter`: capacity limiter to use to limit the total amount of threads running
53- (if omitted, the default limiter is used)
5458
5559 ## Return
5660
@@ -60,22 +64,6 @@ def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
6064 """
6165
6266 async def wrapper (* args : T_ParamSpec .args , ** kwargs : T_ParamSpec .kwargs ) -> T_Retval :
63- partial_f = functools .partial (function , * args , ** kwargs )
64-
65- # In `v4.1.0` anyio added the `abandon_on_cancel` argument and deprecated the old
66- # `cancellable` argument, so we need to use the new `abandon_on_cancel` to avoid
67- # surfacing deprecation warnings.
68- if function_has_argument (anyio .to_thread .run_sync , "abandon_on_cancel" ):
69- return await anyio .to_thread .run_sync (
70- partial_f ,
71- abandon_on_cancel = cancellable ,
72- limiter = limiter ,
73- )
74-
75- return await anyio .to_thread .run_sync (
76- partial_f ,
77- cancellable = cancellable ,
78- limiter = limiter ,
79- )
67+ return await to_thread (function , * args , ** kwargs )
8068
8169 return wrapper
0 commit comments