From e38d2dfd0777dbdd44603c37d74739776c2f8c42 Mon Sep 17 00:00:00 2001 From: CasVT Date: Tue, 10 Dec 2024 13:51:48 +0100 Subject: [PATCH] Added type hints to PortablePool --- backend/base/helpers.py | 49 ++++++++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/backend/base/helpers.py b/backend/base/helpers.py index 4d18f76..f81eb33 100644 --- a/backend/base/helpers.py +++ b/backend/base/helpers.py @@ -11,8 +11,8 @@ from os import cpu_count, sep, symlink from os.path import exists, join from sys import base_exec_prefix, executable, platform, version_info -from typing import (Any, Collection, Dict, Generator, Iterable, - Iterator, List, Mapping, Sequence, Tuple, Union) +from typing import (TYPE_CHECKING, Any, Callable, Collection, Dict, Generator, + Iterable, Iterator, List, Mapping, Sequence, Tuple, Union) from urllib.parse import unquote from aiohttp import ClientError, ClientSession @@ -23,6 +23,9 @@ from backend.base.definitions import Constants, T, U from backend.base.logging import LOGGER +if TYPE_CHECKING: + from multiprocessing.pool import IMapIterator + def get_python_version() -> str: """Get python version as string @@ -686,7 +689,12 @@ def __init__( ) return - def apply(self, func, args=(), kwds={}): + def apply( + self, + func: Callable[..., U], + args: Iterable[Any] = (), + kwds: Mapping[str, Any] = {} + ) -> U: new_args = (func, args) new_func = pool_apply_func return super().apply(new_func, new_args, kwds) @@ -706,17 +714,32 @@ def apply_async( callback, error_callback ) - def map(self, func, iterable, chunksize=None): + def map( + self, + func: Callable[[T], U], + iterable: Iterable[T], + chunksize: Union[int, None] = None + ) -> List[U]: new_iterable = ((func, i) for i in iterable) new_func = pool_map_func return super().map(new_func, new_iterable, chunksize) - def imap(self, func, iterable, chunksize=1): + def imap( + self, + func: Callable[[T], U], + iterable: Iterable[T], + chunksize: Union[int, None] = 1 + ) -> IMapIterator[U]: new_iterable = ((func, i) for i in iterable) new_func = pool_map_func return super().imap(new_func, new_iterable, chunksize) - def imap_unordered(self, func, iterable, chunksize=1): + def imap_unordered( + self, + func: Callable[[T], U], + iterable: Iterable[T], + chunksize: Union[int, None] = 1 + ) -> IMapIterator[U]: new_iterable = ((func, i) for i in iterable) new_func = pool_map_func return super().imap_unordered(new_func, new_iterable, chunksize) @@ -739,12 +762,22 @@ def map_async( error_callback ) - def starmap(self, func, iterable, chunksize=None): + def starmap( + self, + func: Callable[..., U], + iterable: Iterable[Iterable[T]], + chunksize: Union[int, None] = None + ) -> List[U]: new_iterable = ((func, *i) for i in iterable) new_func = pool_starmap_func return super().starmap(new_func, new_iterable, chunksize) - def istarmap_unordered(self, func, iterable, chunksize=1): + def istarmap_unordered( + self, + func: Callable[..., U], + iterable: Iterable[Iterable[T]], + chunksize: Union[int, None] = 1 + ) -> IMapIterator[U]: "A combination of starmap and imap_unordered." new_iterable = ((func, i) for i in iterable) new_func = pool_apply_func