Skip to content

Commit

Permalink
Merge pull request #118 from al-rigazzi/ray_create_settings
Browse files Browse the repository at this point in the history
Ray cluster Cobalt support (#29)

This PR simplifies how RunSettings are created in RayCluster.
For all launchers, a unified approach is adopted, using create_settings
and automatic launch binary detection.

[ committed by @al-rigazzi ]
[ reviewed by @Spartee ]
  • Loading branch information
al-rigazzi authored Jan 12, 2022
2 parents 2096cc2 + 5c2ae97 commit ab4e9bc
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 436 deletions.
1 change: 1 addition & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
PBSOrchestrator, Orchestrator,
LSFOrchestrator
)
from smartsim.error.errors import SSUnsupportedError
from smartsim.settings import (
SrunSettings, AprunSettings,
JsrunSettings, RunSettings
Expand Down
2 changes: 1 addition & 1 deletion smartsim/database/cobaltOrchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
:type batch: bool, optional
:param hosts: specify hosts to launch on, defaults to None. Optional if not launching with OpenMPI
:type hosts: list[str]
:param run_command: specify launch binary. Options are ``mpirun`` and ``aprun``, defaults to "aprun".
:param run_command: specify launch binary. Options are ``mpirun`` and ``aprun``, defaults to ``aprun``.
:type run_command: str, optional
:param interface: network interface to use, defaults to "ipogif0"
:type interface: str, optional
Expand Down
185 changes: 91 additions & 94 deletions smartsim/exp/ray/raycluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

from ...entity import EntityList, SmartSimEntity
from ...error import SSConfigError, SSUnsupportedError
from ...settings import AprunSettings, QsubBatchSettings, SbatchSettings, SrunSettings
from ...utils import delete_elements, get_logger
from ...settings.settings import create_batch_settings, create_run_settings
from ...utils import get_logger
from ...utils.helpers import expand_exe_path, init_default

logger = get_logger(__name__)
Expand All @@ -48,25 +48,28 @@ class RayCluster(EntityList):
:type path: str
:param ray_port: Port at which the head node will be running.
:type ray_port: int
:param ray_args: Arguments to be passed to Ray executable as `--key=value`, or `--key` if `value` is set to `None`.
:param ray_args: Arguments to be passed to Ray executable.
:type ray_args: dict[str,str]
:param num_nodes: Number of hosts, includes 1 head node and all worker nodes.
:type num_nodes: int
:param run_args: Arguments to pass to launcher to specify details such as partition, time, and so on.
:param run_args: Arguments to pass to launcher to specify details such as partition or time.
:type run_args: dict[str,str]
:param batch_args: Additional batch arguments passed to launcher when running batch jobs.
:type batch_args: dict[str,str]
:param launcher: Name of launcher to use for starting the cluster.
:type launcher: str
:param interface: Name of network interface the cluster nodes should bind to.
:type interface: str
:param alloc: ID of allocation to run on, only used if launcher is Slurm and allocation is
obtained with ``ray.slurm.get_allocation``
:param alloc: ID of allocation to run on, if obtained with ``smartsim.slurm.get_allocation``
:type alloc: int
:param batch: Whether cluster should be launched as batch file, ignored when ``launcher`` is `local`
:type batch: bool
:param time: The walltime the cluster will be running for
:type time: str
:param run_command: specify launch binary, defaults to automatic selection.
:type run_command: str
:param hosts: specify hosts to launch on, defaults to None. Optional if not launching with OpenMPI.
:type hosts: str, list[str]
:param password: Password to use for Redis server, which is passed as `--redis_password` to `ray start`.
Can be set to
- `auto`: a strong password will be generated internally
Expand All @@ -90,13 +93,18 @@ def __init__(
time="01:00:00",
interface="ipogif0",
alloc=None,
run_command=None,
host_list=None,
password="auto",
**kwargs,
):
launcher = launcher.lower()
if launcher not in ["slurm", "pbs"]:
supported_launchers = ["slurm", "pbs", "cobalt"]
if launcher not in supported_launchers:
raise SSUnsupportedError(
"Only Slurm and PBS launchers are supported by RayCluster"
"The supported launchers for RayCluster are",
*[f"{launcher_name}," for launcher_name in supported_launchers],
f"but {launcher} was provided.",
)

if password:
Expand Down Expand Up @@ -129,12 +137,23 @@ def __init__(
interface=interface,
alloc=alloc,
num_nodes=num_nodes,
run_command=run_command if run_command else "auto",
host_list=host_list,
**kwargs,
)
if batch:
self._build_batch_settings(num_nodes, time, batch_args, launcher)
self.batch_settings = create_batch_settings(
launcher=launcher,
nodes=num_nodes,
time=time,
batch_args=batch_args,
**kwargs,
)
self.ray_head_address = None

if host_list:
self.set_hosts(host_list=host_list, launcher=launcher)

@property
def batch(self):
try:
Expand All @@ -144,6 +163,31 @@ def batch(self):
except AttributeError:
return False

def set_hosts(self, host_list, launcher):
"""Specify the hosts for the ``RayCluster`` to launch on. This is
optional, unless ``run_command`` is `mpirun`.
:param host_list: list of hosts (compute node names)
:type host_list: str | list[str]
:raises TypeError: if wrong type
"""
if isinstance(host_list, str):
host_list = [host_list.strip()]
if not isinstance(host_list, list):
raise TypeError("host_list argument must be a list of strings")
if not all([isinstance(host, str) for host in host_list]):
raise TypeError("host_list argument must be list of strings")
# TODO check length
if self.batch:
self.batch_settings.set_hostlist(host_list)
for host, node in zip(host_list, self.entities):
# Aprun doesn't like settings hosts in batch launch
if launcher == "pbs" or launcher == "cobalt":
if not self.batch:
node.run_settings.set_hostlist([host])
else:
node.run_settings.set_hostlist([host])

def _initialize_entities(self, **kwargs):

ray_port = kwargs.get("ray_port", 6789)
Expand All @@ -153,16 +197,18 @@ def _initialize_entities(self, **kwargs):
interface = kwargs.get("interface", "ipogif0")
num_nodes = kwargs.get("num_nodes", 0)
alloc = kwargs.get("alloc", None)
run_command = kwargs.get("run_command", None)

ray_head = RayHead(
name="ray_head",
path=self.path,
ray_password=self._ray_password,
ray_port=ray_port,
launcher=launcher,
run_args=run_args,
ray_args=ray_args,
run_args=run_args.copy(),
ray_args=ray_args.copy(),
interface=interface,
run_command=run_command,
alloc=alloc,
)

Expand All @@ -172,28 +218,17 @@ def _initialize_entities(self, **kwargs):
worker_model = RayWorker(
name=f"ray_worker_{worker_id}",
path=self.path,
run_args=run_args,
run_args=run_args.copy(),
ray_port=ray_port,
ray_password=self._ray_password,
ray_args=ray_args,
ray_args=ray_args.copy(),
interface=interface,
run_command=run_command,
launcher=launcher,
alloc=alloc,
)
self.entities.append(worker_model)

def _build_batch_settings(self, num_nodes, time, batch_args, launcher):
if launcher == "pbs":
self.batch_settings = QsubBatchSettings(
nodes=num_nodes, time=time, batch_args=batch_args
)
elif launcher == "slurm":
self.batch_settings = SbatchSettings(
nodes=num_nodes, time=time, batch_args=batch_args
)
else:
raise SSUnsupportedError("Only PBS and Slurm launchers are supported")

def get_head_address(self):
"""Return address of head node
Expand Down Expand Up @@ -250,7 +285,7 @@ def parse_ray_head_node_address(head_log):
:rtype: str
"""

max_attempts = 12
max_attempts = 24
attempts = 0
while not os.path.isfile(head_log):
_time.sleep(5)
Expand Down Expand Up @@ -291,8 +326,10 @@ def __init__(
ray_args=None,
launcher="slurm",
interface="ipogif0",
run_command=None,
alloc=None,
dash_port=8265,
**kwargs,
):
self.dashboard_port = dash_port
self.batch_settings = None
Expand All @@ -305,7 +342,19 @@ def __init__(
ray_port, ray_password, interface, ray_args
)

run_settings = self._build_run_settings(launcher, alloc, run_args, ray_exe_args)
run_settings = create_run_settings(
launcher=launcher,
exe="python",
exe_args=ray_exe_args,
run_args=run_args,
run_command=run_command if run_command else "auto",
alloc=alloc,
**kwargs,
)

run_settings.set_tasks_per_node(1)
run_settings.set_tasks(1)

super().__init__(name, path, run_settings)

def _build_ray_exe_args(self, ray_port, ray_password, interface, ray_args):
Expand Down Expand Up @@ -336,41 +385,6 @@ def _build_ray_exe_args(self, ray_port, ray_password, interface, ray_args):

return " ".join(ray_starter_args)

def _build_run_settings(self, launcher, alloc, run_args, ray_exe_args):

if launcher == "slurm":
run_settings = self._build_srun_settings(alloc, run_args, ray_exe_args)
elif launcher == "pbs":
run_settings = self._build_pbs_settings(run_args, ray_exe_args)
else:
raise SSUnsupportedError("Only slurm, and pbs launchers are supported.")

run_settings.set_tasks(1)
run_settings.set_tasks_per_node(1)
return run_settings

def _build_pbs_settings(self, run_args, ray_args):

aprun_settings = AprunSettings("python", exe_args=ray_args, run_args=run_args)
aprun_settings.set_tasks(1)

return aprun_settings

def _build_srun_settings(self, alloc, run_args, ray_args):

delete_elements(run_args, ["oversubscribe"])

run_args["unbuffered"] = None

srun_settings = SrunSettings(
"python",
exe_args=ray_args,
run_args=run_args,
alloc=alloc,
)
srun_settings.set_nodes(1)
return srun_settings


class RayWorker(SmartSimEntity):
def __init__(
Expand All @@ -383,7 +397,9 @@ def __init__(
ray_args=None,
interface="ipogif0",
launcher="slurm",
run_command=None,
alloc=None,
**kwargs,
):

self.batch_settings = None
Expand All @@ -396,7 +412,19 @@ def __init__(
ray_password, ray_args, ray_port, interface
)

run_settings = self._build_run_settings(launcher, alloc, run_args, ray_exe_args)
run_settings = create_run_settings(
launcher=launcher,
exe="python",
exe_args=ray_exe_args,
run_args=run_args,
run_command=run_command,
alloc=alloc,
**kwargs,
)

run_settings.set_tasks_per_node(1)
run_settings.set_tasks(1)

super().__init__(name, path, run_settings)

@property
Expand Down Expand Up @@ -441,34 +469,3 @@ def _build_ray_exe_args(self, ray_password, ray_args, ray_port, interface):
ray_starter_args += extra_ray_args

return " ".join(ray_starter_args)

def _build_run_settings(self, launcher, alloc, run_args, ray_exe_args):

if launcher == "slurm":
run_settings = self._build_srun_settings(alloc, run_args, ray_exe_args)
elif launcher == "pbs":
run_settings = self._build_pbs_settings(run_args, ray_exe_args)
else:
raise SSUnsupportedError("Only slurm, and pbs launchers are supported.")

run_settings.set_tasks(1)
return run_settings

def _build_pbs_settings(self, run_args, ray_args):

aprun_settings = AprunSettings("python", exe_args=ray_args, run_args=run_args)

return aprun_settings

def _build_srun_settings(self, alloc, run_args, ray_args):
delete_elements(run_args, ["oversubscribe"])
run_args["unbuffered"] = None

srun_settings = SrunSettings(
"python",
exe_args=ray_args,
run_args=run_args,
alloc=alloc,
)
srun_settings.set_nodes(1)
return srun_settings
9 changes: 4 additions & 5 deletions smartsim/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,8 @@ def get_status(self, *args):
statuses = []
for entity in manifest.models:
statuses.append(self._control.get_entity_status(entity))
for entity_list in manifest.ensembles:
for entity_list in manifest.all_entity_lists:
statuses.extend(self._control.get_entity_list_status(entity_list))
orchestrator = manifest.db
if orchestrator:
statuses.extend(self._control.get_entity_list_status(orchestrator))
return statuses
except SmartSimError as e:
logger.error(e)
Expand Down Expand Up @@ -507,7 +504,9 @@ def summary(self, format="github"):
job.history.returns[run],
]
)
return tabulate(values, headers, showindex=True, tablefmt=format)
return tabulate(
values, headers, showindex=True, tablefmt=format, missingval="None"
)

def _launch_summary(self, manifest):
"""Experiment pre-launch summary of entities that will be launched
Expand Down
18 changes: 10 additions & 8 deletions smartsim/launcher/cobalt/cobaltParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@
def parse_cobalt_step_status(output, step_id):
status = "NOTFOUND"
for line in output.split("\n"):
if line.split()[0] == step_id:
line = line.split()
status = line[1]
break
fields = line.split()
if len(fields) >= 2:
if fields[0] == step_id:
status = fields[1]
break
return status


Expand All @@ -47,10 +48,11 @@ def parse_cobalt_step_id(output, step_name):
"""
step_id = None
for line in output.split("\n"):
if line.split()[0] == step_name:
line = line.split()
step_id = line[1]
break
fields = line.split()
if len(fields) >= 2:
if fields[0] == step_name:
step_id = fields[1]
break
return step_id


Expand Down
Loading

0 comments on commit ab4e9bc

Please sign in to comment.