-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Executor class and CLI arguments for parallelism #608
Add Executor class and CLI arguments for parallelism #608
Conversation
results = fct(**unflatten(kwargs)) | ||
results = self.executor.wait( | ||
[self.executor.submit(fct, **unflatten(kwargs))] | ||
)[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of always run task in task
for trial execution, maybe we can provide an option to support trial executor is different from the hpo parallel executor. Also, the objective function is defined by user, if they already using dask submit in it, the it could make a weird nested tasks structure, orion worker task -> trial task -> trial launched tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason to use wait(submit(trial)) is to have a break down of the worker's work into a serie of tasks. For instance it makes it possible to visualize the gant chart in Dask dashboard. If the trial execution is parallelizing work as well I think it makes a sensible structure.
| ----------------------- Worker 1 task -------------------------------|
| ------ trial 1 execution ------| | ------ trial 2 execution ------|
| ---- task 0 in trial ----| | ---- task 0 in trial ----|
| ---- task 1 in trial ----| | ---- task 1 in trial ----|
| ---- task 3 in trial ----| | ---- task 3 in trial ----|
| ----------------------- Worker 2 task -------------------------------|
| ------ trial 1 execution ------| | ------ trial 2 execution ------|
| ---- task 0 in trial ----| | ---- task 0 in trial ----|
| ---- task 1 in trial ----| | ---- task 1 in trial ----|
| ---- task 3 in trial ----| | ---- task 3 in trial ----|
<---- time --->
If the user uses dask for the parallelisation of Oríon and within its task, using secede
and rejoin
makes it possible to have multiple nested layers of execution. It is also possible to do with joblib using different Parallel()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, this nested structure seems easy for dask visual. one of my point was, since we are using the same executor for worker task and trial task, then it always has to be this way, even I may just want to run my objective function in the same process as the worker task for efficiency instead of having another dask task to execute it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, the overhead may not be justified in many contexts. We could make it optional. I have difficulty figuring out what good name this option would have. 🤔 Maybe that's a bad sign. 😅 Something like nest_execution
? 🤔 🤔 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nest_execution
souds good. trial_executor
could also be a choice, it can have a value of dict with same format as worker executor
, none by default, and can support dask
, joblib
or simply worker
meaning the same executor as worker executor
. I guess this is something good to have, maybe can add it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like your idea! This would give more flexibility. I'll see if I can work out something simple for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would require additional configuration options to be coherent with other options (also support in global config, not only function arguments). I would be in favor of pushing it for the next release then to make sure this one goes out on time. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I agree with you :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok so I will merge right now and complete the dask example inside PR #557
Why: Trying to import a class that has the factory as a meta class fails if the Factory is not already imported. This is because during the creation of the class it will try to do package discovery, thus trying to import the class that is being created and that triggers the Factory __init__... To be clear, this happens is the said class is registered in the python entry points. How: The package discovery should be executed lazily when objects are created with the Factory, not when the class is created. This also makes it possible to create new Factory classes at run-time.
Why: The parallelism backend requires more setup than what joblib.parallel_backend offers so we need to build our owns. How: The backend is fairly simple and only expose `submit` to register tasks and `wait` to wait for results of all `future`s. There is also a context manager to switch the executor of the experiment temporarily. This may be useful to do short distributed optimization and then switch to local parallelism.
Why: The optimization was tied to joblib. We now move to Executor to allow using multiple backends directly instead of through joblib. How: All joblib.delayed() calls are converted to executor.submit() and joblib.Parallel()() is converted to executor.wait(). The experiment is assigned a default executor if none are passed. It is possible to use a temporary executor with `client.tmp_executor` so that the default one is reassigned at end of context. This will prove useful when some executor temporary wraps the storage, so that out of the parallel execution we can be sure to access the unwrapped storage.
Why: The new Executor backend was only usable in python API so far. This commit add arguments `--n-workers` and `--executor` plus support in global/local configs for `worker/executor_configuration`. Note: Setting the signal inside the consumer call was problematic because it should only be set in the main thread. Because of this the signal is now set in `cli/hunt.py`.
8ac5f50
to
218fdc5
Compare
No description provided.