Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TMCS starts too many processes and dies #329

Merged
merged 47 commits into from
Apr 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a790a84
Use Queue instead of passing coordinator to workers in TMCS
AnesBenmerzoug Mar 15, 2023
215246a
Instantiate all shapley workers inside get_shapley_workers
AnesBenmerzoug Mar 15, 2023
ca25ed4
Add queue timeout parameter
AnesBenmerzoug Mar 15, 2023
4199098
Create separate test module for tmcs
AnesBenmerzoug Mar 16, 2023
cccd7a5
Delete actors code, Add new abstraction based on the concurrent futur…
AnesBenmerzoug Mar 17, 2023
dac64cb
Fix type hint
AnesBenmerzoug Mar 17, 2023
374b4d1
Fix type hints
AnesBenmerzoug Mar 18, 2023
5d13445
Update data utility learning notebook
AnesBenmerzoug Mar 18, 2023
6d0a13b
Update shapley basic spotify notebook
AnesBenmerzoug Mar 18, 2023
7ac8621
Remove shapley actor module
AnesBenmerzoug Mar 18, 2023
fb2c91d
Update changelog
AnesBenmerzoug Mar 18, 2023
1038a57
Add docstring for init_executor
AnesBenmerzoug Mar 18, 2023
e50911a
Use zeros method of ValuationResult instead of deprecated empty method
AnesBenmerzoug Mar 18, 2023
25410ff
Add docstring for submit method
AnesBenmerzoug Mar 18, 2023
59aa9b5
Use deprecation warning for coordinator_update_period and worker_upda…
AnesBenmerzoug Mar 19, 2023
751f327
Rename n_local_workers to n_workers
AnesBenmerzoug Mar 21, 2023
86248f1
Lots of changes
AnesBenmerzoug Mar 21, 2023
a0e5702
Add tests for RayExecutor
AnesBenmerzoug Mar 21, 2023
9fd5a0e
Rerun shapley basic spotify notebook
AnesBenmerzoug Mar 21, 2023
5eebc56
Refactor futures module into futures packages
AnesBenmerzoug Mar 21, 2023
9c00425
Use ThreadPoolExecutor with max_workers=1 for sequential parallel bac…
AnesBenmerzoug Mar 21, 2023
933ba65
Add calls to sleep inside work item manager thread loops
AnesBenmerzoug Mar 21, 2023
a873edc
Update tests
AnesBenmerzoug Mar 21, 2023
57c34de
Allow using TMCS with sequential, update tests
AnesBenmerzoug Mar 21, 2023
c119a2b
Add check to parallel config when using external ray cluster
AnesBenmerzoug Mar 21, 2023
d8d5ae4
Update tests
AnesBenmerzoug Mar 21, 2023
898f04b
Check if work item manager thread exists before calling join
AnesBenmerzoug Mar 21, 2023
73c6aa1
Fix type hints
AnesBenmerzoug Mar 21, 2023
fc74d14
Track submitted futures in a weakset, call ray.cancel to cancel the t…
AnesBenmerzoug Mar 28, 2023
13771d0
Override the base Executor class' __exit__ method
AnesBenmerzoug Mar 28, 2023
f0ed3f0
Recommend using init_executor instead of instantiating executor directly
AnesBenmerzoug Mar 28, 2023
0bbc10a
Fix docstrings
AnesBenmerzoug Mar 28, 2023
c7448dd
Add n_cpus_per_job field to ParallelConfig
AnesBenmerzoug Apr 3, 2023
27c6cff
Add cancel_futures_on_exit parameter to RayExecutor
AnesBenmerzoug Apr 3, 2023
8e35c06
Merge branch 'develop' into 292-tmcs-starts-too-many-processes-and-dies
AnesBenmerzoug Apr 3, 2023
ee46ecf
Apply suggestions from code review
AnesBenmerzoug Apr 3, 2023
f1a9a65
Remove private broken flag from RayExecutor
AnesBenmerzoug Apr 3, 2023
7210663
Update changelog
AnesBenmerzoug Apr 3, 2023
f01c99f
Merge branch 'develop' into 292-tmcs-starts-too-many-processes-and-dies
mdbenito Apr 4, 2023
63db9c2
Update src/pydvl/utils/config.py
AnesBenmerzoug Apr 5, 2023
d713c74
Apply suggestions from code review
AnesBenmerzoug Apr 5, 2023
64dcde6
Revert changes to wrap()
AnesBenmerzoug Apr 6, 2023
f3d40e7
Remove n_cpus_per_job from ParallelConfig, rename n_workers to n_cpus…
AnesBenmerzoug Apr 6, 2023
2e229a5
Change check for kwargs length inside parallel backend wrap method
AnesBenmerzoug Apr 6, 2023
da3b111
Update fixture
AnesBenmerzoug Apr 6, 2023
14bb2bf
Use n_jobs as max_workers in TMCS
AnesBenmerzoug Apr 6, 2023
fbbb0a3
Pass options to the ray remote function as part of the submit method'…
AnesBenmerzoug Apr 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

- Create new `RayExecutor` class based on the concurrent.futures API,
use the new class to fix an issue with Truncated Monte Carlo Shapley
(TMCS) starting too many processes and dying, plus other small changes
[PR #329](https://github.com/appliedAI-Initiative/pyDVL/pull/329)
- Fix creation of GroupedDataset objects using the `from_arrays`
and `from_sklearn` class methods
[PR #324](https://github.com/appliedAI-Initiative/pyDVL/pull/334)
Expand Down
156 changes: 85 additions & 71 deletions notebooks/shapley_basic_spotify.ipynb

Large diffs are not rendered by default.

238 changes: 146 additions & 92 deletions notebooks/shapley_utility_learning.ipynb

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions src/pydvl/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@
class ParallelConfig:
"""Configuration for parallel computation backend.

:param backend: Type of backend to use. For now only 'ray' is supported.
:param backend: Type of backend to use.
Defaults to 'ray'
:param address: Address of existing remote or local cluster to use.
:param n_local_workers: Number of workers (CPUs) to use when using a local ray cluster.
:param n_cpus_local: Number of CPUs to use when creating a local ray cluster.
This has no effect when using an existing ray cluster.
:param logging_level: Logging level for the parallel backend's worker.
"""

backend: Literal["sequential", "ray"] = "ray"
address: Optional[Union[str, Tuple[str, int]]] = None
n_local_workers: Optional[int] = None
n_cpus_local: Optional[int] = None
logging_level: int = logging.WARNING

def __post_init__(self) -> None:
if self.address is not None and self.n_cpus_local is not None:
raise ValueError("When `address` is set, `n_cpus_local` should be None.")


@dataclass
class MemcachedClientConfig:
Expand Down
2 changes: 1 addition & 1 deletion src/pydvl/utils/parallel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .actor import *
from .backend import *
from .futures import *
from .map_reduce import *
136 changes: 0 additions & 136 deletions src/pydvl/utils/parallel/actor.py

This file was deleted.

28 changes: 15 additions & 13 deletions src/pydvl/utils/parallel/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import ray
from ray import ObjectRef
from ray.remote_function import RemoteFunction

from ..config import ParallelConfig

Expand Down Expand Up @@ -93,18 +92,16 @@ def __repr__(self) -> str:


class SequentialParallelBackend(BaseParallelBackend, backend_name="sequential"):
"""Class used to run jobs sequentially and locally. It shouldn't
be initialized directly. You should instead call `init_parallel_backend`.
"""Class used to run jobs sequentially and locally.

It shouldn't be initialized directly. You should instead call
:func:`~pydvl.utils.parallel.backend.init_parallel_backend`.

:param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with number of cpus
"""

def __init__(self, config: ParallelConfig):
config_dict = asdict(config)
config_dict.pop("backend")
config_dict.pop("address")
config_dict["num_cpus"] = config_dict.pop("n_local_workers")
self.config = config_dict
self.config = {}

def get(self, v: Any, *args, **kwargs):
return v
Expand All @@ -126,8 +123,10 @@ def _effective_n_jobs(self, n_jobs: int) -> int:


class RayParallelBackend(BaseParallelBackend, backend_name="ray"):
"""Class used to wrap ray to make it transparent to algorithms. It shouldn't
be initialized directly. You should instead call `init_parallel_backend`.
"""Class used to wrap ray to make it transparent to algorithms.

It shouldn't be initialized directly. You should instead call
:func:`~pydvl.utils.parallel.backend.init_parallel_backend`.

:param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with
cluster address, number of cpus, etc.
Expand All @@ -136,7 +135,9 @@ class RayParallelBackend(BaseParallelBackend, backend_name="ray"):
def __init__(self, config: ParallelConfig):
config_dict = asdict(config)
config_dict.pop("backend")
config_dict["num_cpus"] = config_dict.pop("n_local_workers")
n_cpus_local = config_dict.pop("n_cpus_local")
if config_dict.get("address", None) is None:
config_dict["num_cpus"] = n_cpus_local
self.config = config_dict
if not ray.is_initialized():
ray.init(**self.config)
Expand Down Expand Up @@ -169,7 +170,7 @@ def wrap(self, fun: Callable, **kwargs) -> Callable:

:return: The `.remote` method of the ray `RemoteFunction`.
"""
if len(kwargs) > 1:
if len(kwargs) > 0:
return ray.remote(**kwargs)(fun).remote # type: ignore
return ray.remote(fun).remote # type: ignore

Expand Down Expand Up @@ -201,7 +202,8 @@ def init_parallel_backend(
) -> BaseParallelBackend:
"""Initializes the parallel backend and returns an instance of it.

:param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc.
:param config: instance of :class:`~pydvl.utils.config.ParallelConfig`
with cluster address, number of cpus, etc.

:Example:

Expand Down
54 changes: 54 additions & 0 deletions src/pydvl/utils/parallel/futures/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from concurrent.futures import Executor, ThreadPoolExecutor
from contextlib import contextmanager
from typing import Generator, Optional

from pydvl.utils.config import ParallelConfig
from pydvl.utils.parallel.futures.ray import RayExecutor

__all__ = ["init_executor"]


@contextmanager
def init_executor(
max_workers: Optional[int] = None,
config: ParallelConfig = ParallelConfig(),
**kwargs,
) -> Generator[Executor, None, None]:
"""Initializes a futures executor based on the passed parallel configuration object.

:param max_workers: Maximum number of concurrent tasks.
:param config: instance of :class:`~pydvl.utils.config.ParallelConfig` with cluster address, number of cpus, etc.
:param kwargs: Other optional parameter that will be passed to the executor.

:Example:

>>> from pydvl.utils.parallel.futures import init_executor
>>> from pydvl.utils.config import ParallelConfig
>>> config = ParallelConfig(backend="ray")
>>> with init_executor(max_workers=3, config=config) as executor:
... pass

>>> from pydvl.utils.parallel.futures import init_executor
>>> with init_executor() as executor:
... future = executor.submit(lambda x: x + 1, 1)
... result = future.result()
...
>>> print(result)
2

>>> from pydvl.utils.parallel.futures import init_executor
>>> with init_executor() as executor:
... results = list(executor.map(lambda x: x + 1, range(5)))
...
>>> print(results)
[1, 2, 3, 4, 5]

"""
if config.backend == "ray":
with RayExecutor(max_workers, config=config, **kwargs) as executor:
yield executor
elif config.backend == "sequential":
with ThreadPoolExecutor(1) as executor:
yield executor
else:
raise NotImplementedError(f"Unexpected parallel type {config.backend}")
Loading