Skip to content

Commit

Permalink
Model as Batch Job (#245)
Browse files Browse the repository at this point in the history
Allows users to launch individual models through an experiment as a batch job with batch settings.

If a model with batch settings is added to a higher order entity (e.g. an ensemble), the batch settings of the higher order entity will be used and the batch settings of the model will be ignored in favor of the batch settings of the higher entity.

[ committed by @MattToast ]
[ reviewed by @al-rigazzi, @ashao ]
  • Loading branch information
MattToast authored Jan 28, 2023
1 parent 7bb68f3 commit dc8f11d
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 26 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.vscode
**/*.swp
__pycache__
.ipynb_checkpoints
.pytest_cache/
Expand All @@ -22,4 +23,4 @@ smartsim/_core/bin/*-server
smartsim/_core/bin/*-cli

# created upon install
smartsim/_core/lib
smartsim/_core/lib
12 changes: 10 additions & 2 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,23 @@ Description

- Fix bug in colocated database entrypoint when loading PyTorch models
- Add support for RedisAI 1.2.7, pyTorch 1.11.0, Tensorflow 2.8.0, ONNXRuntime 1.11.1
- Allow for models to be launched independently as batch jobs

Detailed Notes

- Fix bug in colocated database entrypoint stemming from uninitialized variables. This bug affects PyTorch models being loaded into the database. (PR237_)
- Fix bug in colocated database entrypoint stemming from uninitialized variables. This bug affects PyTorch models being loaded into the database. (PR237_)
- The release of RedisAI 1.2.7 allows us to update support for recent versions of pyTorch, Tensorflow, and ONNX (PR234_)
- Make installation of correct Torch backend more reliable according to instruction from pyTorch
- Models were given a `batch_settings` attribute. When launching a model through `Experiment.start`
the `Experiment` will first check for a non-nullish value at that attribute. If the check is
satisfied, the `Experiment` will attempt to wrap the underlying run command in a batch job using
the object referenced at `Model.batch_settings` as the batch settings for the job. If the check
is not satisfied, the `Model` is launched in the traditional manner as a job step. (PR245_)

.. _PR237: https://github.com/CrayLabs/SmartSim/pull/237
.. _PR234: https://github.com/CrayLabs/SmartSim/pull/234
.. _PR245: https://github.com/CrayLabs/SmartSim/pull/245


0.4.1
-----
Expand Down Expand Up @@ -282,4 +290,4 @@ SmartRedis
.. _changelog:

.. include:: ../smartredis/doc/changelog.rst
:start-line: 3
:start-line: 3
29 changes: 23 additions & 6 deletions smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,23 @@ def _launch(self, manifest):
job_steps = [(self._create_job_step(e), e) for e in elist.entities]
steps.extend(job_steps)

# models themselves cannot be batch steps
# models themselves cannot be batch steps. If batch settings are
# attached, wrap them in an anonymous batch job step
for model in manifest.models:
job_step = self._create_job_step(model)
steps.append((job_step, model))
if model.batch_settings:
anon_entity_list = _AnonymousBatchJob(
model.name, model.path, model.batch_settings
)
anon_entity_list.entities.append(model)
batch_step = self._create_batch_job_step(anon_entity_list)
steps.append((batch_step, model))
else:
job_step = self._create_job_step(model)
steps.append((job_step, model))

# launch steps
for job_step in steps:
self._launch_step(*job_step)
for step, entity in steps:
self._launch_step(step, entity)

def _launch_orchestrator(self, orchestrator):
"""Launch an Orchestrator instance
Expand Down Expand Up @@ -381,7 +390,6 @@ def _launch_step(self, job_step, entity):
:type entity: SmartSimEntity
:raises SmartSimError: if launch fails
"""

try:
job_id = self._launcher.run(job_step)
except LauncherError as e:
Expand Down Expand Up @@ -641,3 +649,12 @@ def _set_dbobjects(self, manifest):
for db_script in entity._db_scripts:
if db_script not in ensemble._db_scripts:
set_script(db_script, client)


class _AnonymousBatchJob(EntityList):
def __init__(self, name, path, batch_settings, **kwargs):
super().__init__(name, path)
self.batch_settings = batch_settings

def _initialize_entities(self, **kwargs):
...
5 changes: 4 additions & 1 deletion smartsim/_core/control/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,14 @@ def __str__(self):
if ensemble.batch:
s += f"{str(ensemble.batch_settings)}\n"
s += "\n"

if self.models:
s += m_header
for model in self.models:
s += f"{model.name}\n"
s += f"{str(model.run_settings)}\n"
if model.batch_settings:
s += f"{model.batch_settings}\n"
s += f"{model.run_settings}\n"
if model.params:
s += f"Parameters: \n{fmt_dict(model.params)}\n"
s += "\n"
Expand Down
8 changes: 7 additions & 1 deletion smartsim/entity/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import warnings

class Model(SmartSimEntity):
def __init__(self, name, params, path, run_settings, params_as_args=None):
def __init__(
self, name, params, path, run_settings, params_as_args=None, batch_settings=None
):
"""Initialize a ``Model``
:param name: name of the model
Expand All @@ -50,12 +52,16 @@ def __init__(self, name, params, path, run_settings, params_as_args=None):
interpreted as command line arguments to
be added to run_settings
:type params_as_args: list[str]
:param batch_settings: Launcher settings for running the individual
model as a batch job, defaults to None
:type batch_settings: BatchSettings | None
"""
super().__init__(name, path, run_settings)
self.params = params
self.params_as_args = params_as_args
self.incoming_entities = []
self._key_prefixing_enabled = False
self.batch_settings = batch_settings
self._db_models = []
self._db_scripts = []
self.files = None
Expand Down
30 changes: 26 additions & 4 deletions smartsim/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,13 @@ def create_ensemble(
raise

def create_model(
self, name, run_settings, params=None, path=None, enable_key_prefixing=False
self,
name,
run_settings,
params=None,
path=None,
enable_key_prefixing=False,
batch_settings=None,
):
"""Create a general purpose ``Model``
Expand All @@ -436,8 +442,19 @@ def create_model(
references to pieces of a workflow that can be parameterized,
and executed.
``Model`` instances can be launched sequentially or as a batch
by adding them into an ``Ensemble``.
``Model`` instances can be launched sequentially, as a batch job,
or as a group by adding them into an ``Ensemble``.
All models require a reference to run settings to specify which
executable to launch as well provide options for how to launch
the executable with the underlying WLM. Furthermore, batch a
reference to a batch settings can be added to launch the model
as a batch job through ``Experiment.start``. If a model with
a reference to a set of batch settings is added to a larger
entity with its own set of batch settings (for e.g. an
``Ensemble``) the batch settings of the larger entity will take
precedence and the batch setting of the model will be
strategically ignored.
Parameters supplied in the `params` argument can be written into
configuration files supplied at runtime to the model through
Expand Down Expand Up @@ -490,14 +507,19 @@ def create_model(
be prefixed with the ``Model`` name.
Default is True.
:type enable_key_prefixing: bool, optional
:param batch_settings: Settings to run model individually as a batch job,
defaults to None
:type batch_settings: BatchSettings | None
:raises SmartSimError: if initialization fails
:return: the created ``Model``
:rtype: Model
"""
path = init_default(getcwd(), path, str)
params = init_default({}, params, dict)
try:
new_model = Model(name, params, path, run_settings)
new_model = Model(
name, params, path, run_settings, batch_settings=batch_settings
)
if enable_key_prefixing:
new_model.enable_key_prefixing()
return new_model
Expand Down
26 changes: 26 additions & 0 deletions tests/full_wlm/test_generic_batch_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,32 @@
pytestmark = pytest.mark.skip(reason="Not testing WLM integrations")


def test_batch_model(fileutils, wlmutils):
"""Test the launch of a manually construced batch model"""

exp_name = "test-batch-model"
exp = Experiment(exp_name, launcher=wlmutils.get_test_launcher())
test_dir = fileutils.make_test_dir()

script = fileutils.get_test_conf_path("sleep.py")
batch_settings = exp.create_batch_settings(nodes=1, time="00:01:00")
if wlmutils.get_test_launcher() == "lsf":
batch_settings.set_account(wlmutils.get_test_account())
if wlmutils.get_test_launcher() == "cobalt":
batch_settings.set_account(wlmutils.get_test_account())
batch_settings.set_queue("debug-flat-quad")
run_settings = wlmutils.get_run_settings("python", f"{script} --time=5")
model = exp.create_model(
"model", path=test_dir, run_settings=run_settings, batch_settings=batch_settings
)
model.set_path(test_dir)

exp.start(model, block=True)
statuses = exp.get_status(model)
assert len(statuses) == 1
assert statuses[0] == status.STATUS_COMPLETED


def test_batch_ensemble(fileutils, wlmutils):
"""Test the launch of a manually constructed batch ensemble"""

Expand Down
113 changes: 102 additions & 11 deletions tests/test_model.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pytest

from smartsim import Experiment
from smartsim._core.launcher.step import SbatchStep, SrunStep
from smartsim.entity import Ensemble, Model
from smartsim.error import EntityExistsError, SSUnsupportedError
from smartsim.settings import RunSettings
from smartsim.settings import RunSettings, SbatchSettings, SrunSettings
from smartsim.settings.mpiSettings import _BaseMPISettings


Expand All @@ -27,22 +29,111 @@ def test_disable_key_prefixing():

def test_catch_colo_mpmd_model():
exp = Experiment("experiment", launcher="local")
rs = _BaseMPISettings(
"python",
exe_args="sleep.py",
fail_if_missing_exec=False
)
rs = _BaseMPISettings("python", exe_args="sleep.py", fail_if_missing_exec=False)

# make it an mpmd model
rs_2 = _BaseMPISettings(
"python",
exe_args="sleep.py",
fail_if_missing_exec=False
)
rs_2 = _BaseMPISettings("python", exe_args="sleep.py", fail_if_missing_exec=False)
rs.make_mpmd(rs_2)

model = exp.create_model("bad_colo_model", rs)

# make it co-located which should raise and error
with pytest.raises(SSUnsupportedError):
model.colocate_db()


def test_attach_batch_settings_to_model():
exp = Experiment("experiment", launcher="slurm")
bs = SbatchSettings()
rs = SrunSettings("python", exe_args="sleep.py")

model_wo_bs = exp.create_model("test_model", run_settings=rs)
assert model_wo_bs.batch_settings is None

model_w_bs = exp.create_model("test_model_2", run_settings=rs, batch_settings=bs)
assert isinstance(model_w_bs.batch_settings, SbatchSettings)


@pytest.fixture
def monkeypatch_exp_controller(monkeypatch):
def _monkeypatch_exp_controller(exp):
entity_steps = []

def start_wo_job_manager(self, manifest, block=True, kill_on_interrupt=True):
self._launch(manifest)

def launch_step_nop(self, step, entity):
entity_steps.append((step, entity))

monkeypatch.setattr(
exp._control,
"start",
start_wo_job_manager.__get__(exp._control, type(exp._control)),
)
monkeypatch.setattr(
exp._control,
"_launch_step",
launch_step_nop.__get__(exp._control, type(exp._control)),
)

return entity_steps

return _monkeypatch_exp_controller


def test_model_with_batch_settings_makes_batch_step(monkeypatch_exp_controller):
exp = Experiment("experiment", launcher="slurm")
bs = SbatchSettings()
rs = SrunSettings("python", exe_args="sleep.py")
model = exp.create_model("test_model", run_settings=rs, batch_settings=bs)

entity_steps = monkeypatch_exp_controller(exp)
exp.start(model)

assert len(entity_steps) == 1
step, entity = entity_steps[0]
assert isinstance(entity, Model)
assert isinstance(step, SbatchStep)


def test_model_without_batch_settings_makes_run_step(
monkeypatch, monkeypatch_exp_controller
):
exp = Experiment("experiment", launcher="slurm")
rs = SrunSettings("python", exe_args="sleep.py")
model = exp.create_model("test_model", run_settings=rs)

# pretend we are in an allocation to not raise alloc err
monkeypatch.setenv("SLURM_JOB_ID", "12345")

entity_steps = monkeypatch_exp_controller(exp)
exp.start(model)

assert len(entity_steps) == 1
step, entity = entity_steps[0]
assert isinstance(entity, Model)
assert isinstance(step, SrunStep)


def test_models_batch_settings_are_ignored_in_ensemble(monkeypatch_exp_controller):
exp = Experiment("experiment", launcher="slurm")
bs_1 = SbatchSettings(nodes=5)
rs = SrunSettings("python", exe_args="sleep.py")
model = exp.create_model("test_model", run_settings=rs, batch_settings=bs_1)

bs_2 = SbatchSettings(nodes=10)
ens = exp.create_ensemble("test_ensemble", batch_settings=bs_2)
ens.add_model(model)

entity_steps = monkeypatch_exp_controller(exp)
exp.start(ens)

assert len(entity_steps) == 1
step, entity = entity_steps[0]
assert isinstance(entity, Ensemble)
assert isinstance(step, SbatchStep)
assert step.batch_settings.batch_args["nodes"] == 10
assert len(step.step_cmds) == 1
step_cmd = step.step_cmds[0]
assert any("srun" in tok for tok in step_cmd) # call the model using run settings
assert not any("sbatch" in tok for tok in step_cmd) # no sbatch in sbatch

0 comments on commit dc8f11d

Please sign in to comment.