|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import functools |
| 4 | +from typing import TypeVar, Callable, Awaitable |
| 5 | +from typing_extensions import ParamSpec |
| 6 | + |
| 7 | +import anyio |
| 8 | +import anyio.to_thread |
| 9 | + |
| 10 | +T_Retval = TypeVar("T_Retval") |
| 11 | +T_ParamSpec = ParamSpec("T_ParamSpec") |
| 12 | + |
| 13 | + |
| 14 | +# copied from `asyncer`, https://github.com/tiangolo/asyncer |
| 15 | +def asyncify( |
| 16 | + function: Callable[T_ParamSpec, T_Retval], |
| 17 | + *, |
| 18 | + cancellable: bool = False, |
| 19 | + limiter: anyio.CapacityLimiter | None = None, |
| 20 | +) -> Callable[T_ParamSpec, Awaitable[T_Retval]]: |
| 21 | + """ |
| 22 | + Take a blocking function and create an async one that receives the same |
| 23 | + positional and keyword arguments, and that when called, calls the original function |
| 24 | + in a worker thread using `anyio.to_thread.run_sync()`. Internally, |
| 25 | + `asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports |
| 26 | + keyword arguments additional to positional arguments and it adds better support for |
| 27 | + autocompletion and inline errors for the arguments of the function called and the |
| 28 | + return value. |
| 29 | +
|
| 30 | + If the `cancellable` option is enabled and the task waiting for its completion is |
| 31 | + cancelled, the thread will still run its course but its return value (or any raised |
| 32 | + exception) will be ignored. |
| 33 | +
|
| 34 | + Use it like this: |
| 35 | +
|
| 36 | + ```Python |
| 37 | + def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str: |
| 38 | + # Do work |
| 39 | + return "Some result" |
| 40 | +
|
| 41 | +
|
| 42 | + result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b") |
| 43 | + print(result) |
| 44 | + ``` |
| 45 | +
|
| 46 | + ## Arguments |
| 47 | +
|
| 48 | + `function`: a blocking regular callable (e.g. a function) |
| 49 | + `cancellable`: `True` to allow cancellation of the operation |
| 50 | + `limiter`: capacity limiter to use to limit the total amount of threads running |
| 51 | + (if omitted, the default limiter is used) |
| 52 | +
|
| 53 | + ## Return |
| 54 | +
|
| 55 | + An async function that takes the same positional and keyword arguments as the |
| 56 | + original one, that when called runs the same original function in a thread worker |
| 57 | + and returns the result. |
| 58 | + """ |
| 59 | + |
| 60 | + async def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: |
| 61 | + partial_f = functools.partial(function, *args, **kwargs) |
| 62 | + return await anyio.to_thread.run_sync(partial_f, cancellable=cancellable, limiter=limiter) |
| 63 | + |
| 64 | + return wrapper |
0 commit comments