diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6d598931e..8a2a3ddf4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -115,7 +115,7 @@ jobs: if: ${{ matrix.executors == 'multiprocessing' }} run: | cd tests - PYTEST_EXECUTORS=multiprocessing,sequential,test_pickling,debug_sequential \ + PYTEST_EXECUTORS=multiprocessing,sequential,multiprocessing_with_pickling,sequential_with_pickling \ uv run --frozen python -m pytest -sv test_all.py test_multiprocessing.py - name: Run slurm tests diff --git a/cluster_tools/Changelog.md b/cluster_tools/Changelog.md index 79ca031f0..9b1d04fc3 100644 --- a/cluster_tools/Changelog.md +++ b/cluster_tools/Changelog.md @@ -10,10 +10,13 @@ For upgrade instructions, please check the respective *Breaking Changes* section [Commits](https://github.com/scalableminds/webknossos-libs/compare/v0.15.11...HEAD) ### Breaking Changes +- Removed the `map_unordered` function of executors. [#1193](https://github.com/scalableminds/webknossos-libs/pull/1193) ### Added ### Changed +- Deprecated the test_pickling and debug_sequential executor strategies. The strategies multiprocessing_with_pickling and sequential should be used instead. [#1193](https://github.com/scalableminds/webknossos-libs/pull/1193) +- The sequential executor strategy no longer uses multiprocessing functionality internally and instead executes functions sequentially and synchronously in the same process. [#1193](https://github.com/scalableminds/webknossos-libs/pull/1193) ### Fixed diff --git a/cluster_tools/cluster_tools/__init__.py b/cluster_tools/cluster_tools/__init__.py index 153e34f6e..31e6729a9 100644 --- a/cluster_tools/cluster_tools/__init__.py +++ b/cluster_tools/cluster_tools/__init__.py @@ -1,11 +1,12 @@ +import warnings from typing import Any, Literal, overload from cluster_tools.executor_protocol import Executor from cluster_tools.executors.dask import DaskExecutor -from cluster_tools.executors.debug_sequential import DebugSequentialExecutor from cluster_tools.executors.multiprocessing_ import MultiprocessingExecutor -from cluster_tools.executors.pickle_ import PickleExecutor +from cluster_tools.executors.multiprocessing_pickle import MultiprocessingPickleExecutor from cluster_tools.executors.sequential import SequentialExecutor +from cluster_tools.executors.sequential_pickle import SequentialPickleExecutor from cluster_tools.schedulers.cluster_executor import ( ClusterExecutor, # noqa: F401 `cluster_tools.schedulers.cluster_executor.ClusterExecutor` imported but unused; RemoteOutOfMemoryException, # noqa: F401 `cluster_tools.schedulers.cluster_executor.ClusterExecutor` imported but unused; @@ -16,7 +17,7 @@ from cluster_tools.schedulers.pbs import PBSExecutor from cluster_tools.schedulers.slurm import SlurmExecutor -# For backwards-compatibility: +# For backwards-compatibility, remove in version 2.0: WrappedProcessPoolExecutor = MultiprocessingExecutor @@ -77,6 +78,18 @@ def get_executor( ) -> MultiprocessingExecutor: ... +@overload +def get_executor( + environment: Literal["multiprocessing_with_pickling"], **kwargs: Any +) -> MultiprocessingPickleExecutor: ... + + +@overload +def get_executor( + environment: Literal["test_pickling"], **kwargs: Any +) -> MultiprocessingPickleExecutor: ... + + @overload def get_executor( environment: Literal["sequential"], **kwargs: Any @@ -86,13 +99,13 @@ def get_executor( @overload def get_executor( environment: Literal["debug_sequential"], **kwargs: Any -) -> DebugSequentialExecutor: ... +) -> SequentialExecutor: ... @overload def get_executor( - environment: Literal["test_pickling"], **kwargs: Any -) -> PickleExecutor: ... + environment: Literal["sequential_with_pickling"], **kwargs: Any +) -> SequentialPickleExecutor: ... def get_executor(environment: str, **kwargs: Any) -> "Executor": @@ -116,8 +129,24 @@ def get_executor(environment: str, **kwargs: Any) -> "Executor": return MultiprocessingExecutor(**kwargs) elif environment == "sequential": return SequentialExecutor(**kwargs) - elif environment == "debug_sequential": - return DebugSequentialExecutor(**kwargs) + elif environment == "sequential_with_pickling": + return SequentialPickleExecutor(**kwargs) + elif environment == "multiprocessing_with_pickling": + return MultiprocessingPickleExecutor(**kwargs) elif environment == "test_pickling": - return PickleExecutor(**kwargs) + # For backwards-compatibility, remove in version 2.0: + warnings.warn( + "The test_pickling execution strategy is deprecated and will be removed in version 2.0. Use multiprocessing_with_pickling instead.", + DeprecationWarning, + stacklevel=2, + ) + return MultiprocessingPickleExecutor(**kwargs) + elif environment == "debug_sequential": + # For backwards-compatibility, remove in version 2.0: + warnings.warn( + "The debug_sequential execution strategy is deprecated and will be removed in version 2.0. Use sequential instead.", + DeprecationWarning, + stacklevel=2, + ) + return SequentialExecutor(**kwargs) raise Exception("Unknown executor: {}".format(environment)) diff --git a/cluster_tools/cluster_tools/executor_protocol.py b/cluster_tools/cluster_tools/executor_protocol.py index be3f129e5..bf5602fd2 100644 --- a/cluster_tools/cluster_tools/executor_protocol.py +++ b/cluster_tools/cluster_tools/executor_protocol.py @@ -20,7 +20,7 @@ class Executor(Protocol, ContextManager["Executor"]): @classmethod - def as_completed(cls, futures: List["Future[_T]"]) -> Iterator["Future[_T]"]: ... + def as_completed(cls, futures: List[Future[_T]]) -> Iterator[Future[_T]]: ... def submit( self, @@ -28,18 +28,14 @@ def submit( /, *args: _P.args, **kwargs: _P.kwargs, - ) -> "Future[_T]": ... - - def map_unordered( - self, fn: Callable[[_S], _T], args: Iterable[_S] - ) -> Iterator[_T]: ... + ) -> Future[_T]: ... def map_to_futures( self, fn: Callable[[_S], _T], args: Iterable[_S], output_pickle_path_getter: Optional[Callable[[_S], PathLike]] = None, - ) -> List["Future[_T]"]: ... + ) -> List[Future[_T]]: ... def map( self, @@ -49,6 +45,6 @@ def map( chunksize: Optional[int] = None, ) -> Iterator[_T]: ... - def forward_log(self, fut: "Future[_T]") -> _T: ... + def forward_log(self, fut: Future[_T]) -> _T: ... def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: ... diff --git a/cluster_tools/cluster_tools/executors/dask.py b/cluster_tools/cluster_tools/executors/dask.py index 859530af0..de656911b 100644 --- a/cluster_tools/cluster_tools/executors/dask.py +++ b/cluster_tools/cluster_tools/executors/dask.py @@ -162,7 +162,7 @@ def from_config( return cls(client, job_resources=job_resources) @classmethod - def as_completed(cls, futures: List["Future[_T]"]) -> Iterator["Future[_T]"]: + def as_completed(cls, futures: List[Future[_T]]) -> Iterator[Future[_T]]: from distributed import as_completed return as_completed(futures) @@ -172,7 +172,7 @@ def submit( # type: ignore[override] __fn: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs, - ) -> "Future[_T]": + ) -> Future[_T]: if "__cfut_options" in kwargs: output_pickle_path = cast(CFutDict, kwargs["__cfut_options"])[ "output_pickle_path" @@ -236,18 +236,6 @@ def check_resources( enrich_future_with_uncaught_warning(fut) return fut - def map_unordered(self, fn: Callable[[_S], _T], args: Iterable[_S]) -> Iterator[_T]: - futs: List["Future[_T]"] = self.map_to_futures(fn, args) - - # Return a separate generator to avoid that map_unordered - # is executed lazily (otherwise, jobs would be submitted - # lazily, as well). - def result_generator() -> Iterator: - for fut in self.as_completed(futs): - yield fut.result() - - return result_generator() - def map_to_futures( self, fn: Callable[[_S], _T], @@ -255,7 +243,7 @@ def map_to_futures( _S ], # TODO change: allow more than one arg per call # noqa FIX002 Line contains TODO output_pickle_path_getter: Optional[Callable[[_S], os.PathLike]] = None, - ) -> List["Future[_T]"]: + ) -> List[Future[_T]]: if output_pickle_path_getter is not None: futs = [ self.submit( # type: ignore[call-arg] @@ -283,7 +271,7 @@ def map( # type: ignore[override] chunksize = 1 return super().map(fn, iterables, timeout=timeout, chunksize=chunksize) - def forward_log(self, fut: "Future[_T]") -> _T: + def forward_log(self, fut: Future[_T]) -> _T: return fut.result() def handle_kill( diff --git a/cluster_tools/cluster_tools/executors/debug_sequential.py b/cluster_tools/cluster_tools/executors/debug_sequential.py deleted file mode 100644 index afa401af7..000000000 --- a/cluster_tools/cluster_tools/executors/debug_sequential.py +++ /dev/null @@ -1,44 +0,0 @@ -from concurrent.futures import Future -from pathlib import Path -from typing import Callable, TypeVar, cast - -from typing_extensions import ParamSpec - -from cluster_tools._utils.warning import enrich_future_with_uncaught_warning -from cluster_tools.executors.multiprocessing_ import CFutDict, MultiprocessingExecutor -from cluster_tools.executors.sequential import SequentialExecutor - -_T = TypeVar("_T") -_P = ParamSpec("_P") - - -class DebugSequentialExecutor(SequentialExecutor): - """ - Only use for debugging purposes. This executor does not spawn new processes for its jobs. Therefore, - setting breakpoint()'s should be possible without context-related problems. - """ - - def submit( # type: ignore[override] - self, - __fn: Callable[_P, _T], - *args: _P.args, - **kwargs: _P.kwargs, - ) -> "Future[_T]": - fut: "Future[_T]" = Future() - if "__cfut_options" in kwargs: - output_pickle_path = cast(CFutDict, kwargs["__cfut_options"])[ - "output_pickle_path" - ] - del kwargs["__cfut_options"] - result = MultiprocessingExecutor._execute_and_persist_function( - Path(output_pickle_path), - __fn, - *args, - **kwargs, - ) - else: - result = __fn(*args, **kwargs) - - fut.set_result(result) - enrich_future_with_uncaught_warning(fut) - return fut diff --git a/cluster_tools/cluster_tools/executors/multiprocessing_.py b/cluster_tools/cluster_tools/executors/multiprocessing_.py index f529affc7..434e5ebbe 100644 --- a/cluster_tools/cluster_tools/executors/multiprocessing_.py +++ b/cluster_tools/cluster_tools/executors/multiprocessing_.py @@ -1,13 +1,11 @@ import logging import multiprocessing import os -import tempfile from concurrent import futures from concurrent.futures import Future, ProcessPoolExecutor from functools import partial from multiprocessing.context import BaseContext from pathlib import Path -from shutil import rmtree from typing import ( Any, Callable, @@ -44,9 +42,8 @@ class CFutDict(TypedDict): class MultiprocessingExecutor(ProcessPoolExecutor): """ Wraps the ProcessPoolExecutor to add various features: - - map_to_futures and map_unordered method + - map_to_futures method - pickling of job's output (see output_pickle_path_getter and output_pickle_path) - - job submission via pickling to circumvent bug in python < 3.8 (see MULTIPROCESSING_VIA_IO_TMP_DIR) """ _mp_context: BaseContext @@ -87,7 +84,7 @@ def __init__( self._mp_logging_handler_pool = _MultiprocessingLoggingHandlerPool() @classmethod - def as_completed(cls, futs: List["Future[_T]"]) -> Iterator["Future[_T]"]: + def as_completed(cls, futs: List[Future[_T]]) -> Iterator[Future[_T]]: return futures.as_completed(futs) def submit( # type: ignore[override] @@ -95,7 +92,7 @@ def submit( # type: ignore[override] __fn: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs, - ) -> "Future[_T]": + ) -> Future[_T]: if "__cfut_options" in kwargs: output_pickle_path = cast(CFutDict, kwargs["__cfut_options"])[ "output_pickle_path" @@ -104,15 +101,6 @@ def submit( # type: ignore[override] else: output_pickle_path = None - if os.environ.get("MULTIPROCESSING_VIA_IO"): - # If MULTIPROCESSING_VIA_IO is set, _submit_via_io is used to - # workaround size constraints in pythons multiprocessing - # implementation. Also see https://github.com/python/cpython/pull/10305/files - # This should be fixed in python 3.8 - submit_fn = self._submit_via_io - else: - submit_fn = super().submit # type: ignore[assignment] - # Depending on the start_method and output_pickle_path, setup functions may need to be # executed in the new process context, before the actual code is ran. # These wrapper functions consume their arguments from *args, **kwargs and assume @@ -144,7 +132,7 @@ def submit( # type: ignore[override] ), ) - fut = submit_fn(__fn, *args, **kwargs) + fut = super().submit(__fn, *args, **kwargs) enrich_future_with_uncaught_warning(fut) return fut @@ -160,53 +148,16 @@ def map( # type: ignore[override] chunksize = 1 return super().map(fn, iterables, timeout=timeout, chunksize=chunksize) - def _submit_via_io( - self, - __fn: Callable[_P, _T], - *args: _P.args, - **kwargs: _P.kwargs, - ) -> "Future[_T]": - opt_tmp_dir = os.environ.get("MULTIPROCESSING_VIA_IO_TMP_DIR") - if opt_tmp_dir is not None: - dirpath = tempfile.mkdtemp(dir=opt_tmp_dir) - else: - dirpath = tempfile.mkdtemp() - - output_pickle_path = Path(dirpath) / "jobdescription.pickle" - - with output_pickle_path.open("wb") as file: - pickling.dump((__fn, args, kwargs), file) - - future = super().submit( - MultiprocessingExecutor._execute_via_io, output_pickle_path - ) - - future.add_done_callback( - partial(MultiprocessingExecutor._remove_tmp_file, dirpath) - ) - - return future - - @staticmethod - def _remove_tmp_file(path: os.PathLike, _future: Future) -> None: - rmtree(path) - @staticmethod def _setup_logging_and_execute( multiprocessing_logging_setup_fn: Callable[[], None], - fn: Callable[_P, "Future[_T]"], + fn: Callable[_P, Future[_T]], *args: Any, **kwargs: Any, - ) -> "Future[_T]": + ) -> Future[_T]: multiprocessing_logging_setup_fn() return fn(*args, **kwargs) - @staticmethod - def _execute_via_io(serialized_function_info_path: os.PathLike) -> Any: - with open(serialized_function_info_path, "rb") as file: - (fn, args, kwargs) = pickling.load(file) - return fn(*args, **kwargs) - @staticmethod def _execute_and_persist_function( output_pickle_path: Path, @@ -231,18 +182,6 @@ def _execute_and_persist_function( pickling.dump((True, result), file) return result - def map_unordered(self, fn: Callable[_P, _T], args: Any) -> Iterator[_T]: - futs: List["Future[_T]"] = self.map_to_futures(fn, args) - - # Return a separate generator to avoid that map_unordered - # is executed lazily (otherwise, jobs would be submitted - # lazily, as well). - def result_generator() -> Iterator: - for fut in futures.as_completed(futs): - yield fut.result() - - return result_generator() - def map_to_futures( self, fn: Callable[[_S], _T], @@ -250,7 +189,7 @@ def map_to_futures( _S ], # TODO change: allow more than one arg per call #noqa: FIX002 Line contains TODO output_pickle_path_getter: Optional[Callable[[_S], os.PathLike]] = None, - ) -> List["Future[_T]"]: + ) -> List[Future[_T]]: if output_pickle_path_getter is not None: futs = [ self.submit( # type: ignore[call-arg] @@ -267,7 +206,7 @@ def map_to_futures( return futs - def forward_log(self, fut: "Future[_T]") -> _T: + def forward_log(self, fut: Future[_T]) -> _T: """ Similar to the cluster executor, this method Takes a future from which the log file is forwarded to the active process. This method blocks as long as the future is not done. diff --git a/cluster_tools/cluster_tools/executors/pickle_.py b/cluster_tools/cluster_tools/executors/multiprocessing_pickle.py similarity index 88% rename from cluster_tools/cluster_tools/executors/pickle_.py rename to cluster_tools/cluster_tools/executors/multiprocessing_pickle.py index 42442db31..a3204a6c0 100644 --- a/cluster_tools/cluster_tools/executors/pickle_.py +++ b/cluster_tools/cluster_tools/executors/multiprocessing_pickle.py @@ -7,8 +7,6 @@ from cluster_tools._utils import pickling from cluster_tools.executors.multiprocessing_ import MultiprocessingExecutor -# The module name includes a _-suffix to avoid name clashes with the standard library pickle module. - _T = TypeVar("_T") _P = ParamSpec("_P") _S = TypeVar("_S") @@ -27,7 +25,7 @@ def _pickle_identity_executor( return _pickle_identity(result) -class PickleExecutor(MultiprocessingExecutor): +class MultiprocessingPickleExecutor(MultiprocessingExecutor): """ The same as MultiprocessingExecutor, but always pickles input and output of the jobs. When using this executor for automated tests, it is ensured that using cluster executors in production @@ -40,7 +38,7 @@ def submit( # type: ignore[override] /, *args: _P.args, **kwargs: _P.kwargs, - ) -> "Future[_T]": + ) -> Future[_T]: (fn_pickled, args_pickled, kwargs_pickled) = _pickle_identity( (fn, args, kwargs) ) diff --git a/cluster_tools/cluster_tools/executors/sequential.py b/cluster_tools/cluster_tools/executors/sequential.py index d49340aa1..80f7eaf95 100644 --- a/cluster_tools/cluster_tools/executors/sequential.py +++ b/cluster_tools/cluster_tools/executors/sequential.py @@ -1,14 +1,23 @@ +from concurrent.futures import Future from multiprocessing.context import BaseContext -from typing import Any, Callable, Optional, Tuple +from pathlib import Path +from typing import Any, Callable, Optional, Tuple, TypeVar, cast -from cluster_tools.executors.multiprocessing_ import MultiprocessingExecutor +from typing_extensions import ParamSpec +from cluster_tools._utils.warning import enrich_future_with_uncaught_warning +from cluster_tools.executors.multiprocessing_ import CFutDict, MultiprocessingExecutor +_T = TypeVar("_T") +_P = ParamSpec("_P") + + +# Strictly speaking, this executor doesn't need to inherit from MultiprocessingExecutor +# but could inherit from futures.Executor instead. However, this would require to duplicate +# quite a few methods to adhere to the executor protocol (as_completed, map_to_futures, map, forward_log, shutdown). class SequentialExecutor(MultiprocessingExecutor): """ - The same as MultiprocessingExecutor, but always uses only one core. In essence, - this is a sequential executor approach, but it still makes use of the standard pool approach. - That way, switching between different executors should always work without any problems. + The same as MultiprocessingExecutor, but synchronous and uses only one core. """ def __init__( @@ -27,3 +36,28 @@ def __init__( initializer=initializer, initargs=initargs, ) + + def submit( # type: ignore[override] + self, + __fn: Callable[_P, _T], + *args: _P.args, + **kwargs: _P.kwargs, + ) -> Future[_T]: + fut: Future[_T] = Future() + if "__cfut_options" in kwargs: + output_pickle_path = cast(CFutDict, kwargs["__cfut_options"])[ + "output_pickle_path" + ] + del kwargs["__cfut_options"] + result = MultiprocessingExecutor._execute_and_persist_function( + Path(output_pickle_path), + __fn, + *args, + **kwargs, + ) + else: + result = __fn(*args, **kwargs) + + fut.set_result(result) + enrich_future_with_uncaught_warning(fut) + return fut diff --git a/cluster_tools/cluster_tools/executors/sequential_pickle.py b/cluster_tools/cluster_tools/executors/sequential_pickle.py new file mode 100644 index 000000000..81506158b --- /dev/null +++ b/cluster_tools/cluster_tools/executors/sequential_pickle.py @@ -0,0 +1,39 @@ +from concurrent.futures import Future +from functools import partial +from typing import Callable, TypeVar + +from typing_extensions import ParamSpec + +from cluster_tools.executors.multiprocessing_pickle import ( + _pickle_identity, + _pickle_identity_executor, +) +from cluster_tools.executors.sequential import SequentialExecutor + +_T = TypeVar("_T") +_P = ParamSpec("_P") +_S = TypeVar("_S") + + +class SequentialPickleExecutor(SequentialExecutor): + """ + The same as SequentialExecutor, but always pickles input and output of the jobs. + When using this executor for automated tests, it is ensured that using cluster executors in production + won't provoke pickling-related problems. In contrast to the MultiprocessingPickleExecutor this executor + does not have multiprocessing overhead. + """ + + def submit( + self, + __fn: Callable[_P, _T], + *args: _P.args, + **kwargs: _P.kwargs, + ) -> Future[_T]: + (fn_pickled, args_pickled, kwargs_pickled) = _pickle_identity( + (__fn, args, kwargs) + ) + return super().submit( + partial(_pickle_identity_executor, fn_pickled), + *args_pickled, + **kwargs_pickled, + ) diff --git a/cluster_tools/cluster_tools/schedulers/cluster_executor.py b/cluster_tools/cluster_tools/schedulers/cluster_executor.py index b8ad1ab81..aeb27de3d 100644 --- a/cluster_tools/cluster_tools/schedulers/cluster_executor.py +++ b/cluster_tools/cluster_tools/schedulers/cluster_executor.py @@ -158,7 +158,7 @@ def __init__( self.metadata["logging_setup_fn"] = kwargs["logging_setup_fn"] @classmethod - def as_completed(cls, futs: List["Future[_T]"]) -> Iterator["Future[_T]"]: + def as_completed(cls, futs: List[Future[_T]]) -> Iterator[Future[_T]]: return futures.as_completed(futs) @classmethod @@ -217,7 +217,7 @@ def _start( workerid: str, job_count: Optional[int] = None, job_name: Optional[str] = None, - ) -> Tuple[List["Future[str]"], List[Tuple[int, int]]]: + ) -> Tuple[List[Future[str]], List[Tuple[int, int]]]: """Start job(s) with the given worker ID and return IDs identifying the new job(s). The job should run ``python -m cfut.remote . @@ -246,7 +246,7 @@ def inner_submit( job_name: Optional[str] = None, additional_setup_lines: Optional[List[str]] = None, job_count: Optional[int] = None, - ) -> Tuple[List["Future[str]"], List[Tuple[int, int]]]: + ) -> Tuple[List[Future[str]], List[Tuple[int, int]]]: pass def _maybe_mark_logs_for_cleanup(self, jobid: str) -> None: @@ -386,7 +386,7 @@ def submit( # type: ignore[override] __fn: Callable[_P, _T], *args: _P.args, **kwargs: _P.kwargs, - ) -> "Future[_T]": + ) -> Future[_T]: """ Submit a job to the pool. kwargs may contain __cfut_options which currently should look like: @@ -475,7 +475,7 @@ def map_to_futures( _S ], # TODO change: allow more than one arg per call # noqa FIX002 Line contains TODO output_pickle_path_getter: Optional[Callable[[_S], os.PathLike]] = None, - ) -> List["Future[_T]"]: + ) -> List[Future[_T]]: self.ensure_not_shutdown() args = list(args) if len(args) == 0: @@ -567,7 +567,7 @@ def register_jobs( should_keep_output: bool, job_index_offset: int, batch_description: str, - jobid_future: "Future[str]", + jobid_future: Future[str], ) -> None: jobid = jobid_future.result() if self.debug: @@ -657,18 +657,7 @@ def result_generator() -> Iterator[_T]: return result_generator() - def map_unordered(self, fn: Callable[_P, _T], args: Any) -> Iterator[_T]: - futs = self.map_to_futures(fn, args) - - # Return a separate generator to avoid that map_unordered - # is executed lazily. - def result_generator() -> Iterator[_T]: - for fut in futures.as_completed(futs): - yield fut.result() - - return result_generator() - - def forward_log(self, fut: "Future[_T]") -> _T: + def forward_log(self, fut: Future[_T]) -> _T: """ Takes a future from which the log file is forwarded to the active process. This method blocks as long as the future is not done. diff --git a/cluster_tools/cluster_tools/schedulers/kube.py b/cluster_tools/cluster_tools/schedulers/kube.py index e52be82bf..d4e96ba51 100644 --- a/cluster_tools/cluster_tools/schedulers/kube.py +++ b/cluster_tools/cluster_tools/schedulers/kube.py @@ -155,7 +155,7 @@ def inner_submit( job_name: Optional[str] = None, additional_setup_lines: Optional[List[str]] = None, # noqa: ARG002 Unused method argument: `additional_setup_lines` job_count: Optional[int] = None, - ) -> Tuple[List["Future[str]"], List[Tuple[int, int]]]: + ) -> Tuple[List[Future[str]], List[Tuple[int, int]]]: """Starts a Kubernetes pod that runs the specified shell command line.""" import kubernetes.client.models as kubernetes_models @@ -164,7 +164,7 @@ def inner_submit( self.ensure_kubernetes_namespace() job_id = str(uuid4()) - job_id_future: "Future[str]" = Future() + job_id_future: Future[str] = Future() job_id_future.set_result(job_id) job_id_futures = [job_id_future] diff --git a/cluster_tools/cluster_tools/schedulers/pbs.py b/cluster_tools/cluster_tools/schedulers/pbs.py index c1bdc973e..cfb030c4f 100644 --- a/cluster_tools/cluster_tools/schedulers/pbs.py +++ b/cluster_tools/cluster_tools/schedulers/pbs.py @@ -106,7 +106,7 @@ def inner_submit( job_name: Optional[str] = None, additional_setup_lines: Optional[List[str]] = None, job_count: Optional[int] = None, - ) -> Tuple[List["Future[str]"], List[Tuple[int, int]]]: + ) -> Tuple[List[Future[str]], List[Tuple[int, int]]]: """Starts a PBS job that runs the specified shell command line.""" if additional_setup_lines is None: additional_setup_lines = [] @@ -151,7 +151,7 @@ def inner_submit( ] job_id = self.submit_text("\n".join(script_lines)) - job_id_future: "Future[str]" = Future() + job_id_future: Future[str] = Future() job_id_future.set_result(job_id) return [job_id_future], [(0, job_count or 1)] diff --git a/cluster_tools/cluster_tools/schedulers/slurm.py b/cluster_tools/cluster_tools/schedulers/slurm.py index 8c72eecf6..b16ea3938 100644 --- a/cluster_tools/cluster_tools/schedulers/slurm.py +++ b/cluster_tools/cluster_tools/schedulers/slurm.py @@ -295,7 +295,7 @@ def inner_submit( job_name: Optional[str] = None, additional_setup_lines: Optional[List[str]] = None, job_count: Optional[int] = None, - ) -> Tuple[List["Future[str]"], List[Tuple[int, int]]]: + ) -> Tuple[List[Future[str]], List[Tuple[int, int]]]: """Starts a Slurm job that runs the specified shell command line.""" if additional_setup_lines is None: additional_setup_lines = [] @@ -321,7 +321,7 @@ def inner_submit( batch_size = max(min(max_array_size, max_submit_jobs), 1) scripts = [] - job_id_futures: List["Future[str]"] = [] + job_id_futures: List[Future[str]] = [] ranges = [] number_of_jobs = job_count if job_count is not None else 1 for job_index_start in range(0, number_of_jobs, batch_size): @@ -543,7 +543,7 @@ def __init__( self, scripts: List[str], job_sizes: List[int], - futures: List["Future[str]"], + futures: List[Future[str]], cfut_dir: str, ): super().__init__() diff --git a/cluster_tools/pyproject.toml b/cluster_tools/pyproject.toml index 2a38d5ce9..61a3bd73f 100644 --- a/cluster_tools/pyproject.toml +++ b/cluster_tools/pyproject.toml @@ -22,7 +22,7 @@ dev-dependencies = [ "typing-extensions ~=4.12.0", "icecream ~=2.1.1", "mypy ~=1.0.0", - "pytest ~=7.2.1", + "pytest ~=8.3.3", "ruff ~=0.5.0", ] diff --git a/cluster_tools/tests/test_all.py b/cluster_tools/tests/test_all.py index 4d4ca8feb..4a6d9da09 100644 --- a/cluster_tools/tests/test_all.py +++ b/cluster_tools/tests/test_all.py @@ -5,7 +5,7 @@ from enum import Enum from functools import partial from pathlib import Path -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, Any, Literal, Optional, Union import pytest @@ -35,63 +35,95 @@ def raise_if(msg: str, _bool: bool) -> None: raise Exception("raise_if was called with True: {}".format(msg)) -def get_executors(with_debug_sequential: bool = False) -> List[cluster_tools.Executor]: - global _dask_cluster +# Most of the specs in this module should be executed with multiple executors. Some tests +# should be called with all executors (including the pickling variants) and some with a subset (i.e., without the pickling variants). +# In order to avoid redundant parameterization of each test, pytest_generate_tests is defined here. +# If a spec uses an `exc_with_pickling` fixture (defined as a function parameter), that test is automatically parameterized with all executors. Analoguous, parameterization happens with `exc`. +# Regarding how this works in details: This function is called for each test and has access to the fixtures supplied +# to the test and most importantly can parametrize those fixtures. +def pytest_generate_tests(metafunc: Any) -> None: + if "exc" in metafunc.fixturenames or "exc_with_pickling" in metafunc.fixturenames: + with_pickling = "exc_with_pickling" in metafunc.fixturenames + executor_keys = get_executor_keys(with_pickling) + metafunc.parametrize( + "exc_with_pickling" if with_pickling else "exc", + executor_keys, + indirect=True, + ) + + +@pytest.fixture +def exc( + request: Any, +) -> cluster_tools.Executor: + return get_executor(request.param) + + +@pytest.fixture +def exc_with_pickling( + request: Any, +) -> cluster_tools.Executor: + return get_executor(request.param) + + +def get_executor_keys(with_pickling: bool = False) -> set[str]: executor_keys = { "slurm", "kubernetes", "dask", "multiprocessing", "sequential", - "test_pickling", } - if with_debug_sequential: - executor_keys.add("debug_sequential") + + if with_pickling: + executor_keys.add("multiprocessing_with_pickling") + executor_keys.add("sequential_with_pickling") if "PYTEST_EXECUTORS" in os.environ: executor_keys = executor_keys.intersection( os.environ["PYTEST_EXECUTORS"].split(",") ) - executors: List[cluster_tools.Executor] = [] - if "slurm" in executor_keys: - executors.append( - cluster_tools.get_executor( - "slurm", debug=True, job_resources={"mem": "100M"} - ) + return executor_keys + + +def get_executor(environment: str) -> cluster_tools.Executor: + global _dask_cluster + + if environment == "slurm": + return cluster_tools.get_executor( + "slurm", debug=True, job_resources={"mem": "100M"} ) - if "kubernetes" in executor_keys: - executors.append( - cluster_tools.get_executor( - "kubernetes", - debug=True, - job_resources={ - "memory": "1G", - "image": "scalableminds/cluster-tools:latest", - }, - ) + if environment == "kubernetes": + return cluster_tools.get_executor( + "kubernetes", + debug=True, + job_resources={ + "memory": "1G", + "image": "scalableminds/cluster-tools:latest", + }, ) - if "multiprocessing" in executor_keys: - executors.append(cluster_tools.get_executor("multiprocessing", max_workers=5)) - if "sequential" in executor_keys: - executors.append(cluster_tools.get_executor("sequential")) - if "dask" in executor_keys: + if environment == "multiprocessing": + return cluster_tools.get_executor("multiprocessing", max_workers=5) + if environment == "sequential": + return cluster_tools.get_executor("sequential") + if environment == "dask": if not _dask_cluster: from distributed import LocalCluster, Worker _dask_cluster = LocalCluster( worker_class=Worker, resources={"mem": 20e9, "cpus": 4}, nthreads=6 ) - executors.append( - cluster_tools.get_executor("dask", job_resources={"address": _dask_cluster}) + return cluster_tools.get_executor( + "dask", job_resources={"address": _dask_cluster} ) - if "test_pickling" in executor_keys: - executors.append(cluster_tools.get_executor("test_pickling")) - if "pbs" in executor_keys: - executors.append(cluster_tools.get_executor("pbs")) - if "debug_sequential" in executor_keys: - executors.append(cluster_tools.get_executor("debug_sequential")) - return executors + if environment == "multiprocessing_with_pickling": + return cluster_tools.get_executor("multiprocessing_with_pickling") + if environment == "pbs": + return cluster_tools.get_executor("pbs") + if environment == "sequential_with_pickling": + return cluster_tools.get_executor("sequential_with_pickling") + raise RuntimeError("No executor specified.") @pytest.mark.skip( @@ -125,13 +157,15 @@ def maybe_negate(b: bool) -> bool: # In the following 4 cases we check whether there is a/no warning when using # map/submit with/without checking the futures. - for exc in get_executors(): + for exc_key in get_executor_keys(): + exc = get_executor(exc_key) marker = "map-expect-warning" with exc: exc.map(partial(raise_if, marker), cases) expect_marker(marker, "There should be a warning for an uncaught Future in map") - for exc in get_executors(): + for exc_key in get_executor_keys(): + exc = get_executor(exc_key) marker = "map-dont-expect-warning" with exc: try: @@ -142,7 +176,8 @@ def maybe_negate(b: bool) -> bool: marker, "There should be no warning for an uncaught Future in map", False ) - for exc in get_executors(): + for exc_key in get_executor_keys(): + exc = get_executor(exc_key) marker = "submit-expect-warning" with exc: futures = [exc.submit(partial(raise_if, marker), b) for b in cases] @@ -150,7 +185,8 @@ def maybe_negate(b: bool) -> bool: marker, "There should be no warning for an uncaught Future in submit" ) - for exc in get_executors(): + for exc_key in get_executor_keys(): + exc = get_executor(exc_key) marker = "submit-dont-expect-warning" with exc: futures = [exc.submit(partial(raise_if, marker), b) for b in cases] @@ -166,174 +202,158 @@ def maybe_negate(b: bool) -> bool: logger.removeHandler(fh) -def test_submit() -> None: - def run_square_numbers(executor: cluster_tools.Executor) -> None: - with executor: - job_count = 3 - job_range = range(job_count) - futures = [executor.submit(square, n) for n in job_range] - for future, job_index in zip(futures, job_range): - assert future.result() == square(job_index) - - for exc in get_executors(with_debug_sequential=True): - run_square_numbers(exc) +def test_submit(exc_with_pickling: cluster_tools.Executor) -> None: + exc = exc_with_pickling + with exc: + job_count = 3 + job_range = range(job_count) + futures = [exc.submit(square, n) for n in job_range] + for future, job_index in zip(futures, job_range): + assert future.result() == square(job_index) def get_pid() -> int: return os.getpid() -def test_process_id() -> None: +def test_process_id(exc_with_pickling: cluster_tools.Executor) -> None: + exc = exc_with_pickling outer_pid = os.getpid() - def compare_pids(executor: cluster_tools.Executor) -> None: - with executor: - future = executor.submit(get_pid) - inner_pid = future.result() - - should_differ = not isinstance(exc, cluster_tools.DebugSequentialExecutor) + with exc: + future = exc.submit(get_pid) + inner_pid = future.result() - if should_differ: - assert ( - inner_pid != outer_pid - ), f"Inner and outer pid should differ, but both are {inner_pid}." - else: - assert ( - inner_pid == outer_pid - ), f"Inner and outer pid should be equal, but {inner_pid} != {outer_pid}." + should_differ = not isinstance( + exc, + ( + cluster_tools.SequentialExecutor, + cluster_tools.SequentialPickleExecutor, + ), + ) - for exc in get_executors(with_debug_sequential=True): - compare_pids(exc) + if should_differ: + assert ( + inner_pid != outer_pid + ), f"Inner and outer pid should differ, but both are {inner_pid}." + else: + assert ( + inner_pid == outer_pid + ), f"Inner and outer pid should be equal, but {inner_pid} != {outer_pid}." + + +def test_unordered_sleep(exc: cluster_tools.Executor) -> None: + is_async = not isinstance( + exc, + ( + cluster_tools.SequentialExecutor, + cluster_tools.SequentialPickleExecutor, + ), + ) + + with exc: + durations = [5, 0] + futures = [exc.submit(sleep, n) for n in durations] + # For synchronous executors, the futures should be completed after submit returns. + # .as_completed() would return them in reverse order in that case. + completed_futures = exc.as_completed(futures) if is_async else futures + results = [f.result() for f in completed_futures] + + if is_async: + # For asynchronous executors, the jobs that sleep less should complete first + durations.sort() + + assert durations == results + + +def test_map_to_futures(exc: cluster_tools.Executor) -> None: + is_async = not isinstance( + exc, + ( + cluster_tools.SequentialExecutor, + cluster_tools.SequentialPickleExecutor, + ), + ) + + with exc: + durations = [5, 0] + futures = exc.map_to_futures(sleep, durations) + # For synchronous executors, the futures should be completed after submit returns. + # .as_completed() would return them in reverse order in that case. + completed_futures = exc.as_completed(futures) if is_async else futures + results = [f.result() for f in completed_futures] + if is_async: + # For asynchronous executors, the jobs that sleep less should complete first + durations.sort() -def test_unordered_sleep() -> None: - """Get host identifying information about the servers running - our jobs. - """ - for exc in get_executors(): - with exc: - durations = [10, 5] - futures = [exc.submit(sleep, n) for n in durations] - if not isinstance(exc, cluster_tools.SequentialExecutor): - durations.sort() - for duration, future in zip(durations, exc.as_completed(futures)): - assert future.result() == duration + assert durations == results -def test_unordered_map() -> None: - for exc in get_executors(): - with exc: - durations = [15, 1] - results_gen = exc.map_unordered(sleep, durations) - results = list(results_gen) +def test_empty_map_to_futures(exc: cluster_tools.Executor) -> None: + with exc: + futures = exc.map_to_futures(sleep, []) + results = [f.result() for f in futures] + assert len(results) == 0 - if not isinstance(exc, cluster_tools.SequentialExecutor): - durations.sort() - for duration, result in zip(durations, results): - assert result == duration +def output_pickle_path_getter(tmp_dir: str, chunk: int) -> Path: + return Path(tmp_dir) / f"test_{chunk}.pickle" -def test_map_to_futures() -> None: - for exc in get_executors(): +def test_map_to_futures_with_pickle_paths( + exc_with_pickling: cluster_tools.Executor, +) -> None: + exc = exc_with_pickling + with tempfile.TemporaryDirectory(dir=".") as tmp_dir: with exc: - durations = [15, 1] - futures = exc.map_to_futures(sleep, durations) - results = [] - - for i, duration in enumerate(exc.as_completed(futures)): - results.append(duration.result()) - - if not isinstance(exc, cluster_tools.SequentialExecutor): - durations.sort() + numbers = [2, 1] + futures = exc.map_to_futures( + square, + numbers, + output_pickle_path_getter=partial(output_pickle_path_getter, tmp_dir), + ) + results = [f.result() for f in exc.as_completed(futures)] + assert set(results) == {1, 4} - for duration_, result in zip(durations, results): - assert result == duration_ + for number in numbers: + assert Path( + output_pickle_path_getter(tmp_dir, number) + ).exists(), f"File for chunk {number} should exist." -def test_empty_map_to_futures() -> None: - for exc in get_executors(): +def test_submit_with_pickle_paths(exc: cluster_tools.Executor) -> None: + with tempfile.TemporaryDirectory(dir=".") as tmp_dir: with exc: - futures = exc.map_to_futures(sleep, []) - results = [f.result() for f in futures] - assert len(results) == 0 - - -def output_pickle_path_getter(tmp_dir: str, chunk: int) -> Path: - return Path(tmp_dir) / f"test_{chunk}.pickle" - + job_count = 3 + job_range = range(job_count) -def test_map_to_futures_with_pickle_paths() -> None: - for exc in get_executors(with_debug_sequential=True): - with tempfile.TemporaryDirectory(dir=".") as tmp_dir: - with exc: - durations = [2, 1] - futures = exc.map_to_futures( - sleep, - durations, - output_pickle_path_getter=partial( - output_pickle_path_getter, tmp_dir - ), + futures = [] + for n in job_range: + output_path = Path(tmp_dir) / f"{n}.pickle" + cfut_options = {"output_pickle_path": output_path} + futures.append( + exc.submit(square, n, __cfut_options=cfut_options) # type: ignore[call-arg] ) - results = [] - - for i, duration in enumerate(exc.as_completed(futures)): - results.append(duration.result()) - - assert 2 in results - assert 1 in results - - for duration_ in durations: - assert Path( - output_pickle_path_getter(tmp_dir, duration_) - ).exists(), f"File for chunk {duration_} should exist." + for future, job_index in zip(futures, job_range): + assert future.result() == square(job_index) -def test_submit_with_pickle_paths() -> None: - for idx, exc in enumerate(get_executors()): - with tempfile.TemporaryDirectory(dir=".") as tmp_dir: - - def run_square_numbers(idx: int, executor: cluster_tools.Executor) -> Path: - with executor: - job_count = 3 - job_range = range(job_count) - - futures = [] - for n in job_range: - output_path = Path(tmp_dir) / f"{idx}_{n}.pickle" - cfut_options = {"output_pickle_path": output_path} - futures.append( - executor.submit(square, n, __cfut_options=cfut_options) # type: ignore[call-arg] - ) - - for future, job_index in zip(futures, job_range): - assert future.result() == square(job_index) - return output_path - - output_path = run_square_numbers(idx, exc) - assert output_path.exists(), "Output pickle file should exist." - + assert output_path.exists(), "Output pickle file should exist." -def test_map() -> None: - def run_map(executor: cluster_tools.Executor) -> None: - with executor: - result = list(executor.map(square, [2, 3, 4])) - assert result == [4, 9, 16] - for exc in get_executors(): - run_map(exc) +def test_map(exc: cluster_tools.Executor) -> None: + with exc: + result = list(exc.map(square, [2, 3, 4])) + assert result == [4, 9, 16] -def test_map_lazy() -> None: - def run_map(executor: cluster_tools.Executor) -> None: - with executor: - result = executor.map(square, [2, 3, 4]) +def test_map_lazy(exc: cluster_tools.Executor) -> None: + if not isinstance(exc, cluster_tools.DaskExecutor): + with exc: + result = exc.map(square, [2, 3, 4]) assert list(result) == [4, 9, 16] - for exc in get_executors(): - if not isinstance(exc, cluster_tools.DaskExecutor): - run_map(exc) - def test_executor_args() -> None: def pass_with(exc: cluster_tools.Executor) -> None: @@ -350,27 +370,27 @@ class DummyEnum(Enum): PEAR = 2 -def enum_consumer(value: DummyEnum) -> None: +def enum_consumer(value: DummyEnum) -> DummyEnum: assert value == DummyEnum.BANANA + return value -def test_cloudpickle_serialization() -> None: - enum_consumer_inner = enum_consumer - - for fn in [enum_consumer, enum_consumer_inner]: - try: - with cluster_tools.get_executor("test_pickling") as executor: - _fut = executor.submit(fn, DummyEnum.BANANA) - assert fn == enum_consumer - except Exception: # noqa: PERF203 `try`-`except` within a loop incurs performance overhead - assert fn != enum_consumer - - assert True - - -def test_map_to_futures_with_debug_sequential() -> None: - with cluster_tools.get_executor("debug_sequential") as exc: - durations = [4, 1] +@pytest.mark.parametrize( + "executor_key", ["multiprocessing_with_pickling", "sequential_with_pickling"] +) +def test_pickling( + executor_key: Union[ + Literal["multiprocessing_with_pickling"], Literal["sequential_with_pickling"] + ], +) -> None: + with cluster_tools.get_executor(executor_key) as executor: + future = executor.submit(enum_consumer, DummyEnum.BANANA) + assert future.result() == DummyEnum.BANANA + + +def test_map_to_futures_with_sequential() -> None: + with cluster_tools.get_executor("sequential") as exc: + durations = [1, 0] futures = exc.map_to_futures(sleep, durations) for fut in futures: @@ -378,9 +398,7 @@ def test_map_to_futures_with_debug_sequential() -> None: fut.done() ), "Future should immediately be finished after map_to_futures has returned" - results = [] - for i, duration in enumerate(futures): - results.append(duration.result()) + results = [f.result() for f in futures] - for duration_, result in zip(durations, results): - assert result == duration_ + for duration, result in zip(durations, results): + assert result == duration diff --git a/cluster_tools/tests/test_multiprocessing.py b/cluster_tools/tests/test_multiprocessing.py index cd3bbb56c..ad5371907 100644 --- a/cluster_tools/tests/test_multiprocessing.py +++ b/cluster_tools/tests/test_multiprocessing.py @@ -1,8 +1,5 @@ import logging import multiprocessing as mp -import os - -import pytest import cluster_tools @@ -52,33 +49,6 @@ def test_map_with_spawn() -> None: ).result(), "Multiprocessing should use `fork` if requested" -def accept_high_mem(data: str) -> int: - return len(data) - - -@pytest.mark.skip( - reason="This test does not pass on the CI. Probably because the machine does not have enough RAM." -) -def test_high_ram_usage() -> None: - very_long_string = " " * 10**6 * 2500 - - os.environ["MULTIPROCESSING_VIA_IO"] = "True" - - with cluster_tools.get_executor("multiprocessing") as executor: - fut1 = executor.submit( - accept_high_mem, - very_long_string, - __cfut_options={"output_pickle_path": "/tmp/test.pickle"}, # type: ignore[call-arg] - ) - assert fut1.result() == len(very_long_string) - - os.environ["MULTIPROCESSING_VIA_IO_TMP_DIR"] = "." - fut2 = executor.submit(accept_high_mem, very_long_string) - assert fut2.result() == len(very_long_string) - - del os.environ["MULTIPROCESSING_VIA_IO"] - - def test_executor_args() -> None: def pass_with(exc: cluster_tools.MultiprocessingExecutor) -> None: with exc: diff --git a/cluster_tools/uv.lock b/cluster_tools/uv.lock index 8c112b44c..4d8f77c53 100644 --- a/cluster_tools/uv.lock +++ b/cluster_tools/uv.lock @@ -13,15 +13,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/45/86/4736ac618d82a20d87d2f92ae19441ebc7ac9e7a581d7e58bbe79233b24a/asttokens-2.4.1-py2.py3-none-any.whl", hash = "sha256:051ed49c3dcae8913ea7cd08e46a606dba30b79993209636c4875bc1d637bc24", size = 27764 }, ] -[[package]] -name = "attrs" -version = "24.2.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/fc/0f/aafca9af9315aee06a89ffde799a10a582fe8de76c563ee80bbcdc08b3fb/attrs-24.2.0.tar.gz", hash = "sha256:5cfb1b9148b5b086569baec03f20d7b6bf3bcacc9a42bebf87ffaaca362f6346", size = 792678 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/6a/21/5b6702a7f963e95456c0de2d495f67bf5fd62840ac655dc451586d23d39a/attrs-24.2.0-py3-none-any.whl", hash = "sha256:81921eb96de3191c8258c199618104dd27ac608d9366f5e35d011eae1867ede2", size = 63001 }, -] - [[package]] name = "cachetools" version = "5.5.0" @@ -183,7 +174,7 @@ requires-dist = [ dev = [ { name = "icecream", specifier = "~=2.1.1" }, { name = "mypy", specifier = "~=1.0.0" }, - { name = "pytest", specifier = "~=7.2.1" }, + { name = "pytest", specifier = "~=8.3.3" }, { name = "ruff", specifier = "~=0.5.0" }, { name = "typing-extensions", specifier = "~=4.12.0" }, ] @@ -585,8 +576,6 @@ version = "6.0.0" source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/18/c7/8c6872f7372eb6a6b2e4708b88419fb46b857f7a2e1892966b851cc79fc9/psutil-6.0.0.tar.gz", hash = "sha256:8faae4f310b6d969fa26ca0545338b21f73c6b15db7c4a8d934a5482faa818f2", size = 508067 } wheels = [ - { url = "https://files.pythonhosted.org/packages/c5/66/78c9c3020f573c58101dc43a44f6855d01bbbd747e24da2f0c4491200ea3/psutil-6.0.0-cp27-none-win32.whl", hash = "sha256:02b69001f44cc73c1c5279d02b30a817e339ceb258ad75997325e0e6169d8b35", size = 249766 }, - { url = "https://files.pythonhosted.org/packages/e1/3f/2403aa9558bea4d3854b0e5e567bc3dd8e9fbc1fc4453c0aa9aafeb75467/psutil-6.0.0-cp27-none-win_amd64.whl", hash = "sha256:21f1fb635deccd510f69f485b87433460a603919b45e2a324ad65b0cc74f8fb1", size = 253024 }, { url = "https://files.pythonhosted.org/packages/0b/37/f8da2fbd29690b3557cca414c1949f92162981920699cd62095a984983bf/psutil-6.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:c588a7e9b1173b6e866756dde596fd4cad94f9399daf99ad8c3258b3cb2b47a0", size = 250961 }, { url = "https://files.pythonhosted.org/packages/35/56/72f86175e81c656a01c4401cd3b1c923f891b31fbcebe98985894176d7c9/psutil-6.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6ed2440ada7ef7d0d608f20ad89a04ec47d2d3ab7190896cd62ca5fc4fe08bf0", size = 287478 }, { url = "https://files.pythonhosted.org/packages/19/74/f59e7e0d392bc1070e9a70e2f9190d652487ac115bb16e2eff6b22ad1d24/psutil-6.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5fd9a97c8e94059b0ef54a7d4baf13b405011176c3b6ff257c247cae0d560ecd", size = 290455 }, @@ -628,10 +617,9 @@ wheels = [ [[package]] name = "pytest" -version = "7.2.2" +version = "8.3.3" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "attrs" }, { name = "colorama", marker = "sys_platform == 'win32'" }, { name = "exceptiongroup", marker = "python_full_version < '3.11'" }, { name = "iniconfig" }, @@ -639,9 +627,9 @@ dependencies = [ { name = "pluggy" }, { name = "tomli", marker = "python_full_version < '3.11'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/b9/29/311895d9cd3f003dd58e8fdea36dd895ba2da5c0c90601836f7de79f76fe/pytest-7.2.2.tar.gz", hash = "sha256:c99ab0c73aceb050f68929bc93af19ab6db0558791c6a0715723abe9d0ade9d4", size = 1320028 } +sdist = { url = "https://files.pythonhosted.org/packages/8b/6c/62bbd536103af674e227c41a8f3dcd022d591f6eed5facb5a0f31ee33bbc/pytest-8.3.3.tar.gz", hash = "sha256:70b98107bd648308a7952b06e6ca9a50bc660be218d53c257cc1fc94fda10181", size = 1442487 } wheels = [ - { url = "https://files.pythonhosted.org/packages/b2/68/5321b5793bd506961bd40bdbdd0674e7de4fb873ee7cab33dd27283ad513/pytest-7.2.2-py3-none-any.whl", hash = "sha256:130328f552dcfac0b1cec75c12e3f005619dc5f874f0a06e8ff7263f0ee6225e", size = 317207 }, + { url = "https://files.pythonhosted.org/packages/6b/77/7440a06a8ead44c7757a64362dd22df5760f9b12dc5f11b6188cd2fc27a0/pytest-8.3.3-py3-none-any.whl", hash = "sha256:a6853c7375b2663155079443d2e45de913a911a11d669df02a50814944db57b2", size = 342341 }, ] [[package]] diff --git a/webknossos/script_collection/move_dataset_slices_by_one.py b/webknossos/script_collection/move_dataset_slices_by_one.py index 0dcd0f003..17dc4bf73 100644 --- a/webknossos/script_collection/move_dataset_slices_by_one.py +++ b/webknossos/script_collection/move_dataset_slices_by_one.py @@ -39,7 +39,7 @@ def create_parser() -> ArgumentParser: parser.add_argument( "--distribution_strategy", default="multiprocessing", - choices=["slurm", "kubernetes", "multiprocessing", "debug_sequential"], + choices=["slurm", "kubernetes", "multiprocessing", "sequential"], help="Strategy to distribute the task across CPUs or nodes.", ) diff --git a/webknossos/webknossos/cli/_utils.py b/webknossos/webknossos/cli/_utils.py index 25da08ad6..4e118ba28 100644 --- a/webknossos/webknossos/cli/_utils.py +++ b/webknossos/webknossos/cli/_utils.py @@ -26,7 +26,7 @@ class DistributionStrategy(str, Enum): SLURM = "slurm" KUBERNETES = "kubernetes" MULTIPROCESSING = "multiprocessing" - DEBUG_SEQUENTIAL = "debug_sequential" + SEQUENTIAL = "sequential" class LayerCategory(str, Enum): diff --git a/webknossos/webknossos/utils.py b/webknossos/webknossos/utils.py index 48d38a12e..fce17cd0a 100644 --- a/webknossos/webknossos/utils.py +++ b/webknossos/webknossos/utils.py @@ -94,7 +94,7 @@ def get_executor_for_args( job_resources=json.loads(args.job_resources), ) logging.info(f"Using {args.distribution_strategy} cluster.") - elif args.distribution_strategy == "debug_sequential": + elif args.distribution_strategy == "sequential": executor = get_executor( args.distribution_strategy, debug=True,