Skip to content

Commit

Permalink
Added type hints to PortablePool
Browse files Browse the repository at this point in the history
  • Loading branch information
Casvt committed Dec 10, 2024
1 parent 4df4c6a commit e38d2df
Showing 1 changed file with 41 additions and 8 deletions.
49 changes: 41 additions & 8 deletions backend/base/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from os import cpu_count, sep, symlink
from os.path import exists, join
from sys import base_exec_prefix, executable, platform, version_info
from typing import (Any, Collection, Dict, Generator, Iterable,
Iterator, List, Mapping, Sequence, Tuple, Union)
from typing import (TYPE_CHECKING, Any, Callable, Collection, Dict, Generator,
Iterable, Iterator, List, Mapping, Sequence, Tuple, Union)
from urllib.parse import unquote

from aiohttp import ClientError, ClientSession
Expand All @@ -23,6 +23,9 @@
from backend.base.definitions import Constants, T, U
from backend.base.logging import LOGGER

if TYPE_CHECKING:
from multiprocessing.pool import IMapIterator


def get_python_version() -> str:
"""Get python version as string
Expand Down Expand Up @@ -686,7 +689,12 @@ def __init__(
)
return

def apply(self, func, args=(), kwds={}):
def apply(
self,
func: Callable[..., U],
args: Iterable[Any] = (),
kwds: Mapping[str, Any] = {}
) -> U:
new_args = (func, args)
new_func = pool_apply_func
return super().apply(new_func, new_args, kwds)
Expand All @@ -706,17 +714,32 @@ def apply_async(
callback, error_callback
)

def map(self, func, iterable, chunksize=None):
def map(
self,
func: Callable[[T], U],
iterable: Iterable[T],
chunksize: Union[int, None] = None
) -> List[U]:
new_iterable = ((func, i) for i in iterable)
new_func = pool_map_func
return super().map(new_func, new_iterable, chunksize)

def imap(self, func, iterable, chunksize=1):
def imap(
self,
func: Callable[[T], U],
iterable: Iterable[T],
chunksize: Union[int, None] = 1
) -> IMapIterator[U]:
new_iterable = ((func, i) for i in iterable)
new_func = pool_map_func
return super().imap(new_func, new_iterable, chunksize)

def imap_unordered(self, func, iterable, chunksize=1):
def imap_unordered(
self,
func: Callable[[T], U],
iterable: Iterable[T],
chunksize: Union[int, None] = 1
) -> IMapIterator[U]:
new_iterable = ((func, i) for i in iterable)
new_func = pool_map_func
return super().imap_unordered(new_func, new_iterable, chunksize)
Expand All @@ -739,12 +762,22 @@ def map_async(
error_callback
)

def starmap(self, func, iterable, chunksize=None):
def starmap(
self,
func: Callable[..., U],
iterable: Iterable[Iterable[T]],
chunksize: Union[int, None] = None
) -> List[U]:
new_iterable = ((func, *i) for i in iterable)
new_func = pool_starmap_func
return super().starmap(new_func, new_iterable, chunksize)

def istarmap_unordered(self, func, iterable, chunksize=1):
def istarmap_unordered(
self,
func: Callable[..., U],
iterable: Iterable[Iterable[T]],
chunksize: Union[int, None] = 1
) -> IMapIterator[U]:
"A combination of starmap and imap_unordered."
new_iterable = ((func, i) for i in iterable)
new_func = pool_apply_func
Expand Down

0 comments on commit e38d2df

Please sign in to comment.