-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ray scheduler driver and job api #329
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments.
Besides those comments one major thing that is missing right now are the tests. I am not entirely sure to what extent we can test all the logic contained in this PR, but we should give it a try. I suggest syncing up with Aliaksandr and Tristan to learn how they test Slurm, Kubernetes, or Docker schedulers. We should have a similar approach here.
|
||
for actor_dict in actors_dict: | ||
|
||
bundle = {"CPU": actor_dict["num_cpus"], "GPU": actor_dict["num_gpus"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would advise getting @amogkam's feedback on the rest of ray_driver.py
. We should pay particular attention to the edge cases. How do we handle failures? For instance what happens if an actor or a process group initialization raises an error? How do we communicate the root cause back to the user?
Chatted briefly with @d4l3k on what an integration test could look like hat we can do is setup a single node Ray cluster on the machine where the test is running that would instantiate a single Ray node with multiple workers, run the test and tear down the cluster For multi node we can leverage something like And last option is we manage our own Ray cluster for tests which will be a hassle |
@amogkam is it possible to simulate a Ray cluster on localhost? |
@cbalioglu sorry for that request, accidentally clicked |
|
||
# On driver.py | ||
logging.debug("Reading actor.json") | ||
actors_dict = load_actor_json('actor.json') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @jiaodong to confirm, but Ray Jobs should automatically set a different working dir for each job that is submitted to the cluster. So even if each job saves to a ray-actors.json
, the absolute path on the node for this file for each job should be different.
while len(unfinished) > 0: | ||
finished, unfinished = ray.wait(unfinished) | ||
# If a failure occurs the ObjectRef will be marked as finished. | ||
# Calling ray.get will expose the failure as a RayActorError. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Calling ray.get will expose the failure as a RayActorError. | |
# Calling ray.get will then expose the exception. The same exception that is raised in the actor will be raised on the driver. If the actor is terminated or pre-empted, then a `RayActorError` is raised. |
Hey @cbalioglu @msaroufim for testing a Ray cluster we have a fake cluster utility that you can run on a single machine: https://docs.ray.io/en/master/fake-autoscaler.html. You can either start a fake multi-node cluster programmatically: https://docs.ray.io/en/master/fake-autoscaler.html#using-ray-cluster-utils-autoscalingcluster @jiaodong is it possible to submit jobs to the fake cluster? |
For testing job submission, as long as we got a running ray cluster with dashboard module running on headnode, we can use it for testing. Currently what we do the most is start a single node ray cluster and submit jobs to the headnode. There's a WIP PR that adds a bunch of things to job submission with tests passing: https://github.com/ray-project/ray/pull/19860/files you should be able to find some tests examples there for {python api, sdk, http endpoint} by looking at files in |
Adding final todos here
|
Feedback from Alex 11/23 PO
P2
|
@msaroufim can you rebase this diff so the integ tests run? |
Component test Setup cluster
Use torchXSmoke test
Train.py test
Usage instructions
|
Codecov Report
@@ Coverage Diff @@
## main #329 +/- ##
==========================================
+ Coverage 94.73% 94.83% +0.09%
==========================================
Files 61 63 +2
Lines 3229 3347 +118
==========================================
+ Hits 3059 3174 +115
- Misses 170 173 +3
Continue to review full report at Codecov.
|
def load_actor_json(filename : str) -> List[Dict]: | ||
with open(filename) as f: | ||
actor = json.load(f) | ||
actor = json.loads(actor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def load_actor_json(filename : str) -> List[Dict]:
with open(filename) as f:
actor = json.load(f) #from filename to string
actor = json.loads(actor) #from string to dict
return actor
torchx/schedulers/ray/ray_driver.py
Outdated
actors: List[RayActor], pgs: List[PlacementGroup] | ||
) -> List[CommandActor]: | ||
command_actors: List[CommandActor] = [] | ||
address, port = get_address_and_port() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, there's no guarantee that the rank 0 worker is placed on the same node that ray_driver
is run on- I think you'd want to move this to inside the for loop, and call this on the rank 0 actor for each group of actors. And this would require some changes to the current for loop.
So add a new method to CommandActor
:
def get_address_and_port(self):
return get_address_and_port()
And then inside the for loop the sequencing should be:
- create all the actors
- get the master address and port from the rank 0 worker
- then pass in the master address and port to all the actors, which means that these values cannot be set during the actor init
Happy to chat more about this as well, or provide some code snippets
) | ||
|
||
if app.metadata: | ||
_logger.warning("The Ray scheduler does not use metadata information.") | ||
def log_iter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @jiaodong mentioned, log streaming support was added to jobs in Ray 1.10: ray-project/ray#20976.
After upgrading to 1.10, that functionality can probably be leveraged directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@msaroufim has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
This pull request was exported from Phabricator. Differential Revision: D33689705 |
54f141f
to
05e1912
Compare
This pull request was exported from Phabricator. Differential Revision: D33689705 |
Summary: ## To run distributed pytorch test ``` cd torch/torchx/schedulers/test python -m unittest ray_scheduler_test.py ``` ``` 2021-11-24 00:53:22,810 INFO worker.py:840 -- Connecting to existing Ray cluster at address: 172.31.2.209:6379 2021-11-24 00:53:23,127 INFO sdk.py:144 -- Uploading package gcs://_ray_pkg_e212ba1ff8e15e24.zip. 2021-11-24 00:53:23,128 INFO packaging.py:352 -- Creating a file package for local directory '/tmp/tmp9copqxd2'. status: PENDING status: PENDING status: RUNNING status: RUNNING status: RUNNING status: RUNNING status: SUCCEEDED 2021-11-24 00:53:25,521 INFO worker.py:840 -- Connecting to existing Ray cluster at address: 172.31.2.209:6379 (CommandActor pid=3575) initializing `gloo` process group (CommandActor pid=3574) initializing `gloo` process group (CommandActor pid=3575) successfully initialized process group (CommandActor pid=3575) rank: 1, actual world_size: 2, computed world_size: 2 (CommandActor pid=3574) successfully initialized process group (CommandActor pid=3574) rank: 0, actual world_size: 2, computed world_size: 2 ``` ## To run distributed pytorch test using torchX CLI ``` # Setup cluster and get a HEAD NODE IP ray up -y ray_cluster.yaml pip install torchx[dev] # Get a job ID from deployed job torchx run -s ray -cfg dashboard_address=34.209.89.185:20002,working_dir=test_dir utils.binary --entrypoint ray_simple.py # Use job ID to get logs or job status torchx describe ray://torchx/34.209.89.185:20002-raysubmit_aKvezN3NyA2mqZeW torchx log ray://torchx/34.209.89.185:20002-raysubmit_aKvezN3NyA2mqZeW ``` ## Or ``` ray up -y ray_cluster.yaml pip install torchx[dev] torchx run -s ray -cfg cluster_config_file=ray_cluster.yaml,working_dir=test_dir utils.binary --entrypoint ray_simple.py ``` ## ray_simple.py The unit test `ray_scheduler_test.py` is more interesting ``` # Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. def main() -> None: print("hello") return if __name__ == "__main__": main() ``` Pull Request resolved: #329 Reviewed By: aivanou Differential Revision: D33689705 Pulled By: msaroufim fbshipit-source-id: b0fb5b95cf148ab3047a7df43e196b05d443a6d3
This pull request was exported from Phabricator. Differential Revision: D33689705 |
To run distributed pytorch test
To run distributed pytorch test using torchX CLI