Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RayCluster with create_settings #118

Merged
merged 25 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f62693c
Add Cobalt support to RayCluster
al-rigazzi Oct 22, 2021
26cd07b
Merge branch 'develop' of https://github.com/CrayLabs/SmartSim into r…
al-rigazzi Oct 22, 2021
eea9c5a
Add condition for short parsed lines
al-rigazzi Oct 22, 2021
519e00d
Apply style
al-rigazzi Oct 22, 2021
9a3a3df
Update Ray init to use new standard
al-rigazzi Oct 22, 2021
8a9a4ff
Add control over ray temp files on cobalt
al-rigazzi Oct 22, 2021
0073ada
Add queue to Cobalt batch ray, minor fixes
al-rigazzi Oct 22, 2021
1711865
Give more time to Ray cluster to spin up
al-rigazzi Oct 22, 2021
e2c675c
Merge remote-tracking branch 'upstream/develop' into ray_cobalt
al-rigazzi Dec 9, 2021
02a5d5f
Merge branch 'develop' of https://github.com/CrayLabs/SmartSim into r…
al-rigazzi Dec 9, 2021
e6f28a5
Add create_run/batch_settings to RayCluster
al-rigazzi Dec 15, 2021
d1754ca
Add set_tasks to ray run_settings
al-rigazzi Dec 15, 2021
f7706a1
Remove set_tasks from RayCluster
al-rigazzi Dec 15, 2021
087523e
Add run_command to ray pbs test
al-rigazzi Dec 15, 2021
d68218c
Add run_command to all ray tests
al-rigazzi Dec 15, 2021
3d5ef29
Update RayCluster docs
al-rigazzi Dec 15, 2021
5f0465b
Merge branch 'develop' of https://github.com/CrayLabs/SmartSim into r…
al-rigazzi Dec 15, 2021
1c1b35c
Change RayCluster run_command mechanism
al-rigazzi Dec 15, 2021
7cccffd
Update test for new Ray API
al-rigazzi Dec 15, 2021
8b66b02
Unify tests using Ray, fix tabulate
al-rigazzi Dec 15, 2021
90745c4
Apply style
al-rigazzi Dec 16, 2021
aea96d5
Address review comments and delete unused imports
al-rigazzi Jan 10, 2022
d278e7b
Shorten ray docstrings
al-rigazzi Jan 11, 2022
adcb97c
Apply style, shorten docstrings.
al-rigazzi Jan 11, 2022
5c2ae97
Enforce one task per ray entity
al-rigazzi Jan 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
PBSOrchestrator, Orchestrator,
LSFOrchestrator
)
from smartsim.error.errors import SSUnsupportedError
from smartsim.settings import (
SrunSettings, AprunSettings,
JsrunSettings, RunSettings
Expand Down
2 changes: 1 addition & 1 deletion smartsim/database/cobaltOrchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
:type batch: bool, optional
:param hosts: specify hosts to launch on, defaults to None. Optional if not launching with OpenMPI
:type hosts: list[str]
:param run_command: specify launch binary. Options are ``mpirun`` and ``aprun``, defaults to "aprun".
:param run_command: specify launch binary. Options are ``mpirun`` and ``aprun``, defaults to ``aprun``.
:type run_command: str, optional
:param interface: network interface to use, defaults to "ipogif0"
:type interface: str, optional
Expand Down
179 changes: 85 additions & 94 deletions smartsim/exp/ray/raycluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

from ...entity import EntityList, SmartSimEntity
from ...error import SSConfigError, SSUnsupportedError
from ...settings import AprunSettings, QsubBatchSettings, SbatchSettings, SrunSettings
from ...utils import delete_elements, get_logger
from ...settings.settings import create_batch_settings, create_run_settings
from ...utils import get_logger
from ...utils.helpers import expand_exe_path, init_default

logger = get_logger(__name__)
Expand All @@ -48,25 +48,28 @@ class RayCluster(EntityList):
:type path: str
:param ray_port: Port at which the head node will be running.
:type ray_port: int
:param ray_args: Arguments to be passed to Ray executable as `--key=value`, or `--key` if `value` is set to `None`.
:param ray_args: Arguments to be passed to Ray executable.
:type ray_args: dict[str,str]
:param num_nodes: Number of hosts, includes 1 head node and all worker nodes.
:type num_nodes: int
:param run_args: Arguments to pass to launcher to specify details such as partition, time, and so on.
:param run_args: Arguments to pass to launcher to specify details such as partition or time.
:type run_args: dict[str,str]
:param batch_args: Additional batch arguments passed to launcher when running batch jobs.
:type batch_args: dict[str,str]
:param launcher: Name of launcher to use for starting the cluster.
:type launcher: str
:param interface: Name of network interface the cluster nodes should bind to.
:type interface: str
:param alloc: ID of allocation to run on, only used if launcher is Slurm and allocation is
obtained with ``ray.slurm.get_allocation``
:param alloc: ID of allocation to run on, if obtained with ``smartsim.slurm.get_allocation``
:type alloc: int
:param batch: Whether cluster should be launched as batch file, ignored when ``launcher`` is `local`
:type batch: bool
:param time: The walltime the cluster will be running for
:type time: str
:param run_command: specify launch binary, defaults to automatic selection.
:type run_command: str
:param hosts: specify hosts to launch on, defaults to None. Optional if not launching with OpenMPI.
:type hosts: str, list[str]
:param password: Password to use for Redis server, which is passed as `--redis_password` to `ray start`.
Can be set to
- `auto`: a strong password will be generated internally
Expand All @@ -90,13 +93,18 @@ def __init__(
time="01:00:00",
interface="ipogif0",
alloc=None,
run_command=None,
host_list=None,
password="auto",
**kwargs,
):
launcher = launcher.lower()
if launcher not in ["slurm", "pbs"]:
supported_launchers = ["slurm", "pbs", "cobalt"]
if launcher not in supported_launchers:
raise SSUnsupportedError(
"Only Slurm and PBS launchers are supported by RayCluster"
"The supported launchers for RayCluster are",
*[f"{launcher_name}," for launcher_name in supported_launchers],
f"but {launcher} was provided.",
)

if password:
Expand Down Expand Up @@ -129,12 +137,23 @@ def __init__(
interface=interface,
alloc=alloc,
num_nodes=num_nodes,
run_command=run_command if run_command else "auto",
host_list=host_list,
**kwargs,
)
if batch:
self._build_batch_settings(num_nodes, time, batch_args, launcher)
self.batch_settings = create_batch_settings(
Spartee marked this conversation as resolved.
Show resolved Hide resolved
launcher=launcher,
nodes=num_nodes,
time=time,
batch_args=batch_args,
**kwargs,
)
self.ray_head_address = None

if host_list:
self.set_hosts(host_list=host_list, launcher=launcher)

@property
def batch(self):
try:
Expand All @@ -144,6 +163,31 @@ def batch(self):
except AttributeError:
return False

def set_hosts(self, host_list, launcher):
"""Specify the hosts for the ``RayCluster`` to launch on. This is
optional, unless ``run_command`` is `mpirun`.

:param host_list: list of hosts (compute node names)
:type host_list: str | list[str]
:raises TypeError: if wrong type
"""
if isinstance(host_list, str):
host_list = [host_list.strip()]
if not isinstance(host_list, list):
raise TypeError("host_list argument must be a list of strings")
if not all([isinstance(host, str) for host in host_list]):
raise TypeError("host_list argument must be list of strings")
# TODO check length
if self.batch:
self.batch_settings.set_hostlist(host_list)
for host, node in zip(host_list, self.entities):
# Aprun doesn't like settings hosts in batch launch
if launcher == "pbs" or launcher == "cobalt":
if not self.batch:
node.run_settings.set_hostlist([host])
else:
node.run_settings.set_hostlist([host])

def _initialize_entities(self, **kwargs):

ray_port = kwargs.get("ray_port", 6789)
Expand All @@ -153,16 +197,18 @@ def _initialize_entities(self, **kwargs):
interface = kwargs.get("interface", "ipogif0")
num_nodes = kwargs.get("num_nodes", 0)
alloc = kwargs.get("alloc", None)
run_command = kwargs.get("run_command", None)

ray_head = RayHead(
name="ray_head",
path=self.path,
ray_password=self._ray_password,
ray_port=ray_port,
launcher=launcher,
run_args=run_args,
ray_args=ray_args,
run_args=run_args.copy(),
ray_args=ray_args.copy(),
interface=interface,
run_command=run_command,
alloc=alloc,
)

Expand All @@ -172,28 +218,17 @@ def _initialize_entities(self, **kwargs):
worker_model = RayWorker(
name=f"ray_worker_{worker_id}",
path=self.path,
run_args=run_args,
run_args=run_args.copy(),
ray_port=ray_port,
ray_password=self._ray_password,
ray_args=ray_args,
ray_args=ray_args.copy(),
interface=interface,
run_command=run_command,
launcher=launcher,
alloc=alloc,
)
self.entities.append(worker_model)

def _build_batch_settings(self, num_nodes, time, batch_args, launcher):
if launcher == "pbs":
self.batch_settings = QsubBatchSettings(
nodes=num_nodes, time=time, batch_args=batch_args
)
elif launcher == "slurm":
self.batch_settings = SbatchSettings(
nodes=num_nodes, time=time, batch_args=batch_args
)
else:
raise SSUnsupportedError("Only PBS and Slurm launchers are supported")

def get_head_address(self):
"""Return address of head node

Expand Down Expand Up @@ -250,7 +285,7 @@ def parse_ray_head_node_address(head_log):
:rtype: str
"""

max_attempts = 12
max_attempts = 24
attempts = 0
while not os.path.isfile(head_log):
_time.sleep(5)
Expand Down Expand Up @@ -291,8 +326,10 @@ def __init__(
ray_args=None,
launcher="slurm",
interface="ipogif0",
run_command=None,
alloc=None,
dash_port=8265,
**kwargs,
):
self.dashboard_port = dash_port
self.batch_settings = None
Expand All @@ -305,7 +342,16 @@ def __init__(
ray_port, ray_password, interface, ray_args
)

run_settings = self._build_run_settings(launcher, alloc, run_args, ray_exe_args)
run_settings = create_run_settings(
launcher=launcher,
exe="python",
exe_args=ray_exe_args,
run_args=run_args,
run_command=run_command if run_command else "auto",
alloc=alloc,
**kwargs,
)

super().__init__(name, path, run_settings)

def _build_ray_exe_args(self, ray_port, ray_password, interface, ray_args):
Expand Down Expand Up @@ -336,41 +382,6 @@ def _build_ray_exe_args(self, ray_port, ray_password, interface, ray_args):

return " ".join(ray_starter_args)

def _build_run_settings(self, launcher, alloc, run_args, ray_exe_args):

if launcher == "slurm":
run_settings = self._build_srun_settings(alloc, run_args, ray_exe_args)
elif launcher == "pbs":
run_settings = self._build_pbs_settings(run_args, ray_exe_args)
else:
raise SSUnsupportedError("Only slurm, and pbs launchers are supported.")

run_settings.set_tasks(1)
run_settings.set_tasks_per_node(1)
return run_settings

def _build_pbs_settings(self, run_args, ray_args):

aprun_settings = AprunSettings("python", exe_args=ray_args, run_args=run_args)
aprun_settings.set_tasks(1)

return aprun_settings

def _build_srun_settings(self, alloc, run_args, ray_args):

delete_elements(run_args, ["oversubscribe"])

run_args["unbuffered"] = None

srun_settings = SrunSettings(
"python",
exe_args=ray_args,
run_args=run_args,
alloc=alloc,
)
srun_settings.set_nodes(1)
return srun_settings


class RayWorker(SmartSimEntity):
def __init__(
Expand All @@ -383,7 +394,9 @@ def __init__(
ray_args=None,
interface="ipogif0",
launcher="slurm",
run_command=None,
alloc=None,
**kwargs,
):

self.batch_settings = None
Expand All @@ -396,7 +409,16 @@ def __init__(
ray_password, ray_args, ray_port, interface
)

run_settings = self._build_run_settings(launcher, alloc, run_args, ray_exe_args)
run_settings = create_run_settings(
launcher=launcher,
exe="python",
exe_args=ray_exe_args,
run_args=run_args,
run_command=run_command,
alloc=alloc,
**kwargs,
)

super().__init__(name, path, run_settings)

@property
Expand Down Expand Up @@ -441,34 +463,3 @@ def _build_ray_exe_args(self, ray_password, ray_args, ray_port, interface):
ray_starter_args += extra_ray_args

return " ".join(ray_starter_args)

def _build_run_settings(self, launcher, alloc, run_args, ray_exe_args):

if launcher == "slurm":
run_settings = self._build_srun_settings(alloc, run_args, ray_exe_args)
elif launcher == "pbs":
run_settings = self._build_pbs_settings(run_args, ray_exe_args)
else:
raise SSUnsupportedError("Only slurm, and pbs launchers are supported.")

run_settings.set_tasks(1)
return run_settings

def _build_pbs_settings(self, run_args, ray_args):

aprun_settings = AprunSettings("python", exe_args=ray_args, run_args=run_args)

return aprun_settings

def _build_srun_settings(self, alloc, run_args, ray_args):
delete_elements(run_args, ["oversubscribe"])
run_args["unbuffered"] = None

srun_settings = SrunSettings(
"python",
exe_args=ray_args,
run_args=run_args,
alloc=alloc,
)
srun_settings.set_nodes(1)
return srun_settings
11 changes: 5 additions & 6 deletions smartsim/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import time
import os.path as osp
import time
from os import getcwd
from pprint import pformat

Expand Down Expand Up @@ -207,11 +207,8 @@ def get_status(self, *args):
statuses = []
for entity in manifest.models:
statuses.append(self._control.get_entity_status(entity))
for entity_list in manifest.ensembles:
for entity_list in manifest.all_entity_lists:
statuses.extend(self._control.get_entity_list_status(entity_list))
orchestrator = manifest.db
if orchestrator:
statuses.extend(self._control.get_entity_list_status(orchestrator))
return statuses
except SmartSimError as e:
logger.error(e)
Expand Down Expand Up @@ -500,7 +497,9 @@ def summary(self, format="github"):
job.history.returns[run],
]
)
return tabulate(values, headers, showindex=True, tablefmt=format)
return tabulate(
values, headers, showindex=True, tablefmt=format, missingval="None"
)

def _launch_summary(self, manifest):
"""Experiment pre-launch summary of entities that will be launched
Expand Down
Loading