Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Are there asynchronous parallel examples? #896

Closed
deng-cy opened this issue Apr 5, 2022 · 9 comments
Closed

Are there asynchronous parallel examples? #896

deng-cy opened this issue Apr 5, 2022 · 9 comments
Assignees
Labels
question Further information is requested

Comments

@deng-cy
Copy link

deng-cy commented Apr 5, 2022

I have a function that needs to be evaluate remotely

evaluate(parameter, server_id):
    # send `parameter` to server based on `server_id`
    return {'metric': (res, 0)}

For instance, I have 5 servers and want to optimize the metric in parallel. Currently the only way I could think of is to run the following :

import multiprocessing as mp
ps=[]
for i in range(5):
    parameters, trial_index = ax_client.get_next_trial()
    p = mp.Process(target=evaluate, args=(parameters, i))
    p.start()
    ps.append(p)
for i in range(5):
    ps[i].join()
    # retrieve data 
    ax_client.complete_trial(trial_index=trial_index, raw_data=retrieved_data)

This is actually synchronous, which is less efficient. Is there a way to construct an asynchronous parallel evaluation?

@sgbaird
Copy link
Contributor

sgbaird commented Apr 5, 2022

@deng-cy Have you taken a look at https://ax.dev/tutorials/raytune_pytorch_cnn.html?

@deng-cy
Copy link
Author

deng-cy commented Apr 5, 2022

Thanks for your reply. Yes I did, but I feel it is just a black box without telling how to assign server_id

@mpolson64
Copy link
Contributor

mpolson64 commented Apr 5, 2022

Hi @deng-cy thank you for reaching out! Your use case may be a good candidate for our Scheduler API, which has a tutorial here.

It involves a little more overhead to set up compared to the Service API, you will define a Runner object to send trials to your external system and Metric objects to retrieve the data once the trial has been run, but it gives you the ability to set maximum parallelism (among other settings) and allow Ax to handle scheduling, new point generation, and trial deployment and polling for you automatically. Personally this is my favorite way of using Ax as it allows the user to "set it and forget it", which I find more than worth the effort in setting the Scheduler up for most experiments.

If you need any assistance setting this up or any other questions with your specific use case please feel free to continue to respond in this thread.

@mpolson64 mpolson64 self-assigned this Apr 5, 2022
@lena-kashtelyan lena-kashtelyan added the question Further information is requested label Apr 5, 2022
@deng-cy
Copy link
Author

deng-cy commented Apr 9, 2022

Thanks for your reply! I think Scheduler should work. But I felt it was too complicated since I need to write the whole class even if I only need to customize a little bit. I solved the issue by multiprocessing in Python with Service API.

def evaluate(parameter, server_id, q, trial_index):
    # send `parameter` to server based on `server_id`
    q.put({'idx': trial_index, 'res': (metric, 0))}


n_result = 0
q = mp.Queue()
ps = []

for i in range(len(servers_id)):
    parameters, trial_index = ax_client.get_next_trial()
    p = mp.Process(target=evaluate, args=(parameters, servers_id[i], q, trial_index))
    p.start()
    ps.append(p)
while n_result < n_trials:
    res = q.get(block=True)
    n_result += 1
    ax_client.complete_trial(trial_index=res['idx'], raw_data=res['res'])

    for i in range(len(servers_id)):
        if ps[i].is_alive() == False:
            parameters, trial_index = ax_client.get_next_trial()
            p = mp.Process(target=evaluate, args=(parameters, servers_id[i], q, trial_index))
            p.start()
            ps[i] = p

@deng-cy deng-cy closed this as completed Apr 9, 2022
@sgbaird
Copy link
Contributor

sgbaird commented Apr 9, 2022

@deng-cy nice job! I'll keep this in mind. I take it you meant Service API (not Server API), correct? Thanks for including a code snippet.

@deng-cy
Copy link
Author

deng-cy commented Apr 9, 2022

@sgbaird Yeah Service API, I corrected it.

@bernardo-suez
Copy link

bernardo-suez commented Jun 8, 2022

This is great and I'm trying to use the code but have two questions:

  1. Where the server_id list comes from?
  2. Inside evaluate, how to I send the function evaluation to a server (I'm assuming the server can run on a core, no?) based on its server_id?

I tried using Ray but apparently it is not set up yet for multiobjective problems, am I right?

@sgbaird
Copy link
Contributor

sgbaird commented Nov 16, 2022

I tried using Ray but apparently it is not set up yet for multiobjective problems, am I right?

@bernardo-suez I believe you are correct.

(source: ray-project/ray#8018 (comment)) I wrote my own scheduler and analyzer to deal with multi-objective optimization problems. It was not easy for me, And I spent quite a while to do that.

This also doesn't seem to be on Ray's roadmap.

(source: ray-project/ray#8018 (comment)) For existing schedulers, I have no immediate plan to support multi-objective optimization. However, I'm happy to take any contributions. Note that this is not hard to do - adding multi-objective optimization does not require any lower-level modifications to the framework.

In other words, if you want to do asynchronous multi-objective optimization in Ax, adapting Scheduler might be your best option. You also might be able to save some work by using the Service API to create some of the building blocks and then pass those where needed in the Scheduler API.

2. Inside evaluate, how to I send the function evaluation to a server (I'm assuming the server can run on a core, no?) based on its server_id?

I think this will be specific to whatever server you're using. For example, sending a job to Amazon AWS vs. Google Cloud vs. an HPC university cluster will each take different forms. In other words, it's specific to your "external client". If your external client doesn't have a way of communicating with the machine running the optimization algorithm (which could be the same machine, btw), then this is something you'll need to implement yourself. For example, you could communicate via a Google sheets page, an external database (SQL, MongoDB, etc.), or an MQTT server.

@sgbaird
Copy link
Contributor

sgbaird commented Nov 15, 2024

Looping back to this, I played around this a while back.

Here's a simple example of using "plain" ray (i.e., not raytune), to do work in parallel. This one takes the square of a number.

%pip install ray
import ray

# Start Ray. This creates some processes that can do work in parallel.
ray.init(num_cpus=2)


# Add this line to signify that the function can be run in parallel (as a
# "task"). Ray will load-balance different `square` tasks automatically.
@ray.remote
def square(x):
    return x * x


# Create some parallel work using a list comprehension, then block until the
# results are ready with `ray.get`.
results = ray.get([square.remote(x) for x in range(100)])

ray.shutdown()

Applying this to Ax Service API batch optimization:

%pip install ax-platform ray
import ray
from ax.service.ax_client import AxClient
from ax.utils.measurement.synthetic_functions import branin

batch_size = 2
num_trials = 11

ax_client = AxClient()
ax_client.create_experiment(
    parameters=[
        {"name": "x1", "type": "range", "bounds": [-5.0, 10.0]},
        {"name": "x2", "type": "range", "bounds": [0.0, 15.0]},
    ],
    objective_name="branin",
    minimize=True,
    # Sets max parallelism to 10 for all steps of the generation strategy.
    choose_generation_strategy_kwargs={
        "num_trials": num_trials,
        "max_parallelism_override": batch_size,
        "enforce_sequential_optimization": False,
    },
)


@ray.remote
def evaluate(parameters):
    return {"branin": branin(parameters["x1"], parameters["x2"])}


n = 0
while n < num_trials:
    curr_batch_size = batch_size if n + batch_size < num_trials else num_trials - n

    trial_mapping, optimization_complete = ax_client.get_next_trials(curr_batch_size)
    n = n + curr_batch_size

    # start running trials in a queue (new trials will start as resources are freed)
    futures = [evaluate.remote(parameters) for parameters in trial_mapping.values()]

    # wait for all trials in the batch to complete before continuing (i.e. blocking)
    results = ray.get(futures)

    # report the completion of trials to the Ax client
    for trial_index, raw_data in zip(trial_mapping.keys(), results):
        ax_client.complete_trial(trial_index=trial_index, raw_data=raw_data)

ray.shutdown()

Copied from scripts/ray_get_reproducer.py and ax_batch_reproducer.py

Perhaps the logic can be adjusted to handle asynchronous cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

5 participants