Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TMCS starts too many processes and dies #329

Merged
merged 47 commits into from
Apr 8, 2023

Conversation

AnesBenmerzoug
Copy link
Collaborator

@AnesBenmerzoug AnesBenmerzoug commented Mar 18, 2023

Description

This PR closes #292

It does so by using an abstraction based on concurrent.futures instead of using actors.

I tried first to use ray queues to avoid passing the coordinator to the workers
but they also rely on an actor and it kept dying.

Changes

  • Added RayExecutor class based concurrent.futures to parallel package.
  • Used new concurrent.futures executor abstraction in TMCS.
  • Removed abstract and shapley actor modules.
  • Moved TMCS tests to separate module.
  • Fixed check for number of subsets in Data Utility Learning class.
  • Updated Data Utility Learning notebook.
  • Updated Shapley Basic Spotify notebook.

EDIT More changes:

  • Removed n_concurrent_computations and used n_jobs instead of it to mean the number of tasks to submit before waiting for results.
  • Replaced n_local_workers in the ParallelConfig class with n_workers and use that to set the number of max_workers in the given Executor.
  • Added a __post_init__ method to ParallelConfig to make sure that n_workers is None when address is set.
  • Added tests specifically for the executor.
  • For the 'sequential' parallel backend, use ThreadPoolExecutor with max_workers=1.

EDIT 2 More changes:

  • Added n_cpus_per_job field to ParallelConfig.
  • Added cancel_futures_on_exit boolean parameter to RayExecutor.

EDIT 3 More changes:

  • Changed n_workers in ParallelConfig to n_cpus_local to align more closely with its meaning in ray.
  • Remove n_cpus_per_job from ParallelConfig and pass it instead as an option to the executor's submit method as part of the kwargs parameter. Otherwise mypy will complain that the method does not have the same signature as the one defined in the base Executor class.
  • Used max_workers in RayExecutor as the maximum number of submitted jobs. Take its value from n_jobs instead of n_workers (which was renamed to n_cpus_local).
  • Added a new variable inside TMCS with a value of 2 * effective_n_jobs to represent the total number of submitted jobs, including the jobs that are running.

Checklist

  • Wrote Unit tests (if necessary)
  • Updated Documentation (if necessary)
  • Updated Changelog
  • If notebooks were added/changed, added boilerplate cells are tagged with "nbsphinx":"hidden"

@AnesBenmerzoug AnesBenmerzoug added this to the v0.7.0 milestone Mar 18, 2023
@AnesBenmerzoug AnesBenmerzoug self-assigned this Mar 18, 2023
It is almost the same as the one from the base Executor class
but it escapes the start characters because sphinx complains
about starting emphasis character without matching ending character
@AnesBenmerzoug AnesBenmerzoug marked this pull request as ready for review March 19, 2023 08:22
Copy link
Collaborator

@mdbenito mdbenito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the cancelling of tasks, this PR has highlighted our inconsistent (and possibly bogus) use of n_jobs everywhere. I think we need to fix it

More generally, I think that we are not really using ray as it's supposed to be used. For one, ParallelConfig.n_local_workers is used for num_cpus in ray.init() which does not have the effect we document: instead it's the number of cpus for a "raylet" (which I guess is the number of cpus available to each node), which is fine when starting a local cluster, but probably not when using an existing one.

What do you think about the idea of setting max_workers in the parallel config (with it being a noop for ray, or maybe a check against the number of nodes available for a running cluster), and then using n_jobs as the number of tasks, and setting num_cpus to 1 in the call to ray.remote()?

src/pydvl/value/shapley/truncated.py Outdated Show resolved Hide resolved
src/pydvl/utils/parallel/futures.py Outdated Show resolved Hide resolved
src/pydvl/value/shapley/truncated.py Show resolved Hide resolved
src/pydvl/value/shapley/truncated.py Outdated Show resolved Hide resolved
src/pydvl/value/shapley/truncated.py Outdated Show resolved Hide resolved
src/pydvl/utils/parallel/futures.py Outdated Show resolved Hide resolved
@AnesBenmerzoug
Copy link
Collaborator Author

@mdbenito let's discuss this during the next meeting.

@AnesBenmerzoug AnesBenmerzoug modified the milestones: v0.7.0, v0.6.1 Apr 3, 2023
Copy link
Collaborator

@mdbenito mdbenito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that there are some inconsistencies wrt. max_workers. The number of cpus available in the cluster is an external factor over which the code has no effect. So we must ignore that, in particular in ray.init(), where num_cpus does not refer to the number of cpus used for a local cluster.

max_workers could then be used as either of:

  1. the maximum number of vCPUs to be used by the executor (num_jobs * num_cpus_per_job)
  2. the maximum number of tasks to be run by the executor (so that effective_cpus_used = max_workers * n_cpus_per_job).

We need to fix the names once and for all:

  • task = job
  • worker = single-core process = CPU

I find the second one horrible, but that seems to be ray's convention, right? We don't have to follow it though: in the ParallelConfig and elsewhere we could use max_cpus instead of max_workers. The question is then what to do when we allow for additional resources like GPUs

src/pydvl/utils/config.py Outdated Show resolved Hide resolved
src/pydvl/utils/parallel/futures/__init__.py Outdated Show resolved Hide resolved
src/pydvl/utils/parallel/futures/__init__.py Outdated Show resolved Hide resolved
src/pydvl/utils/parallel/futures/ray.py Outdated Show resolved Hide resolved
src/pydvl/utils/parallel/futures/ray.py Outdated Show resolved Hide resolved
src/pydvl/value/shapley/truncated.py Outdated Show resolved Hide resolved
src/pydvl/value/shapley/truncated.py Show resolved Hide resolved
tests/utils/conftest.py Outdated Show resolved Hide resolved
src/pydvl/utils/parallel/futures/ray.py Outdated Show resolved Hide resolved
src/pydvl/utils/parallel/backend.py Outdated Show resolved Hide resolved
AnesBenmerzoug and others added 2 commits April 5, 2023 14:20
Co-authored-by: Miguel de Benito Delgado <[email protected]>
Co-authored-by: Miguel de Benito Delgado <[email protected]>
@AnesBenmerzoug
Copy link
Collaborator Author

AnesBenmerzoug commented Apr 5, 2023

@mdbenito I read the ray documentation and architecture more thoroughly and here's what I found:

  • According to this section of their documentation:

    • Resource requirements of tasks or actors do NOT impose limits on actual physical resource usage.
    • Ray doesn’t provide CPU isolation for tasks or actors.
  • According to this other section of their document:

    • By default, Ray nodes start with pre-defined CPU, GPU, and memory resources. The quantities of these resources on each node are set to the physical quantities auto detected by Ray. By default, logical resources are configured by the following rule:
      • Number of logical CPUs (num_cpus): Set to the number of CPUs of the machine/container.
    • Using ray.init() to start a single node Ray cluster and setting num_cpus will start a Ray node with num_cpus logical cpus i.e. num_cpus worker processes.
  • According to yet another section of their documentation:

    • Ray allows specifying a task or actor’s resource requirements (e.g., CPU, GPU, and custom resources). The task or actor will only run on a node if there are enough required resources available to execute the task or actor.

I finally understand this better. Thanks for the link to that document.

Here's what I suggest:

  • Change n_workers in ParallelConfig to n_cpus_local to align more closely with its meaning in ray.
  • Remove n_cpus_per_job from ParallelConfig and pass it instead as an option to the executor's submit method. Still I think this needs more thinking.
  • Use max_workers in RayExecutor as the maximum number of submitted jobs. Take its value from n_jobs instead of n_workers.
  • Add another argument called queue_size or something similar to TMCS to represent the number of tasks that will be submitted at each iteration. It can default to 2 * effective_n_jobs

What do you think?

…s kwargs

This is done because mypy complains if we don't have the same signature as the base Executor class
@AnesBenmerzoug AnesBenmerzoug merged commit 1a31aba into develop Apr 8, 2023
@mdbenito mdbenito deleted the 292-tmcs-starts-too-many-processes-and-dies branch May 16, 2023 08:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

TMCS starts too many processes (and dies)
2 participants