Skip to content

Commit 39f03e7

Browse files
Kiuk Chungfacebook-github-bot
authored andcommitted
(1/2)(torchx/cli) make internal schedulers a workspace scheduler; implement jetter_workspace_scheduler (#380)
Summary: Pull Request resolved: #380 ~~Part 1/2 (need an additional diff to make changes in penv python)~~ (done with this diff) Does the following: 1. Adds a `torchx.workspace.Workspace` MixIn for Schedulers. Move Docker workspace build from docker_scheduler to torchx.workspace.DockerWorkspace 1. Implements a `JetterWorkspace` that uses jetter to build patches. Falls back to torchx custom patch building for BC. 1. Removes the now unnecessary fb/cmd_run (since we moved everything to workspaces. 1. Adds the `JetterWorkspace` mixin to mast, flow, qf, local_fb schedulers 1. Takes out `DockerWorkspace` from docker_scheduler.build_workspace_and_update_role() 1. Adds the `DockerWorkspace` mixin to docker and k8s schedulers 1. Gets rid of `WorkspaceScheduler` (in favor of `Workspace` mixin) and `WorkspaceRunner` (just builds an optional workspace parameter Runner APIs) 1. (bugfix) read from the config_dirs (home + cwd) to get component default args Reviewed By: d4l3k Differential Revision: D33971673 fbshipit-source-id: c44622b0d01f2d3bbe192b9e0ebf683777c47ec5
1 parent 8b62ea8 commit 39f03e7

17 files changed

+321
-454
lines changed

torchx/cli/cmd_run.py

Lines changed: 27 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
from pyre_extensions import none_throws
1919
from torchx.cli.cmd_base import SubCommand
2020
from torchx.cli.cmd_log import get_logs
21-
from torchx.runner import Runner, config
21+
from torchx.runner import Runner, config, get_runner
2222
from torchx.runner.config import load_sections
23-
from torchx.runner.workspaces import WorkspaceRunner, get_workspace_runner
2423
from torchx.schedulers import get_default_scheduler_name, get_scheduler_factories
2524
from torchx.specs import CfgVal
2625
from torchx.specs.finder import (
@@ -37,6 +36,8 @@
3736
"missing component name, either provide it from the CLI or in .torchxconfig"
3837
)
3938

39+
CONFIG_DIRS = [str(Path.home()), str(Path.cwd())]
40+
4041
logger: logging.Logger = logging.getLogger(__name__)
4142

4243

@@ -180,6 +181,12 @@ def add_arguments(self, subparser: argparse.ArgumentParser) -> None:
180181
default=False,
181182
help="Stream logs while waiting for app to finish.",
182183
)
184+
subparser.add_argument(
185+
"--workspace",
186+
"--buck-target",
187+
default=f"file://{Path.cwd()}",
188+
help="local workspace to build/patch (buck-target of main binary if using buck)",
189+
)
183190
subparser.add_argument(
184191
"component_name_and_args",
185192
nargs=argparse.REMAINDER,
@@ -197,57 +204,37 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
197204
run_opts = runner.run_opts()
198205
scheduler_opts = run_opts[args.scheduler]
199206
cfg = _parse_run_config(args.scheduler_args, scheduler_opts)
200-
config_dirs = [str(Path.home()), str(Path.cwd())]
201207

202-
config.apply(scheduler=args.scheduler, cfg=cfg, dirs=config_dirs)
203-
204-
config_files = config.find_configs(dirs=config_dirs)
205-
workspace = (
206-
"file://" + os.path.dirname(config_files[0]) if config_files else None
207-
)
208+
config.apply(scheduler=args.scheduler, cfg=cfg, dirs=CONFIG_DIRS)
208209

209210
component, component_args = _parse_component_name_and_args(
210211
args.component_name_and_args,
211212
none_throws(self._subparser),
212-
dirs=config_dirs,
213+
dirs=CONFIG_DIRS,
213214
)
214-
215215
try:
216216
if args.dryrun:
217-
if isinstance(runner, WorkspaceRunner):
218-
dryrun_info = runner.dryrun_component(
219-
component,
220-
component_args,
221-
args.scheduler,
222-
workspace=workspace,
223-
cfg=cfg,
224-
)
225-
else:
226-
dryrun_info = runner.dryrun_component(
227-
component, component_args, args.scheduler, cfg=cfg
228-
)
217+
dryrun_info = runner.dryrun_component(
218+
component,
219+
component_args,
220+
args.scheduler,
221+
workspace=args.workspace,
222+
cfg=cfg,
223+
)
229224
logger.info(
230225
"\n=== APPLICATION ===\n"
231226
f"{pformat(asdict(dryrun_info._app), indent=2, width=80)}"
232227
)
233228

234229
logger.info("\n=== SCHEDULER REQUEST ===\n" f"{dryrun_info}")
235230
else:
236-
if isinstance(runner, WorkspaceRunner):
237-
app_handle = runner.run_component(
238-
component,
239-
component_args,
240-
args.scheduler,
241-
workspace=workspace,
242-
cfg=cfg,
243-
)
244-
else:
245-
app_handle = runner.run_component(
246-
component,
247-
component_args,
248-
args.scheduler,
249-
cfg=cfg,
250-
)
231+
app_handle = runner.run_component(
232+
component,
233+
component_args,
234+
args.scheduler,
235+
workspace=args.workspace,
236+
cfg=cfg,
237+
)
251238
# DO NOT delete this line. It is used by slurm tests to retrieve the app id
252239
print(app_handle)
253240

@@ -278,8 +265,8 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
278265

279266
def run(self, args: argparse.Namespace) -> None:
280267
os.environ["TORCHX_CONTEXT_NAME"] = os.getenv("TORCHX_CONTEXT_NAME", "cli_run")
281-
component_defaults = load_sections(prefix="component")
282-
with get_workspace_runner(component_defaults=component_defaults) as runner:
268+
component_defaults = load_sections(prefix="component", dirs=CONFIG_DIRS)
269+
with get_runner(component_defaults=component_defaults) as runner:
283270
self._run(runner, args)
284271

285272
def _wait_and_exit(self, runner: Runner, app_handle: str, log: bool) -> None:

torchx/cli/test/cmd_run_test.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
CmdRun,
2323
_parse_component_name_and_args,
2424
_parse_run_config,
25-
logger,
2625
)
2726
from torchx.schedulers.local_scheduler import SignalException
2827
from torchx.specs import runopts
@@ -160,26 +159,6 @@ def test_run_dryrun(self, mock_runner_run: MagicMock) -> None:
160159
self.cmd_run.run(args)
161160
mock_runner_run.assert_not_called()
162161

163-
@patch("torchx.runner.workspaces.WorkspaceRunner._patch_app")
164-
def test_runopts_not_found(self, patch_app: MagicMock) -> None:
165-
args = self.parser.parse_args(
166-
[
167-
"--dryrun",
168-
"--scheduler",
169-
"kubernetes",
170-
"utils.echo",
171-
"--image",
172-
"/tmp",
173-
]
174-
)
175-
with patch.object(logger, "error") as log_error:
176-
with self.assertRaises(SystemExit) as e:
177-
self.cmd_run.run(args)
178-
msg = log_error.call_args[0][0]
179-
self.assertTrue(
180-
"Scheduler arg is incorrect or missing required option" in msg
181-
)
182-
183162
def _get_test_runopts(self) -> runopts:
184163
opts = runopts()
185164
opts.add("foo", type_=str, default="", help="")

torchx/components/component_test_base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ def run_appdef_on_scheduler(
4848

4949
runner = get_runner("test-runner")
5050
if dryrun:
51-
dryrun_info = runner.dryrun(app_def, scheduler, cfg)
51+
dryrun_info = runner.dryrun(app_def, scheduler, cfg=cfg)
5252
print(f"Dryrun info: {dryrun_info}")
5353
return dryrun_info
5454
else:
55-
app_handle = runner.run(app_def, scheduler, cfg)
55+
app_handle = runner.run(app_def, scheduler, cfg=cfg)
5656
print(f"AppHandle: {app_handle}")
5757
app_status = runner.wait(app_handle)
5858
print(f"Final status: {app_status}")

torchx/runner/api.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
runopts,
3131
)
3232
from torchx.specs.finder import get_component
33+
from torchx.workspace.api import Workspace
3334

3435

3536
logger: logging.Logger = logging.getLogger(__name__)
@@ -103,6 +104,7 @@ def run_component(
103104
component_args: List[str],
104105
scheduler: SchedulerBackend,
105106
cfg: Optional[Mapping[str, CfgVal]] = None,
107+
workspace: Optional[str] = None,
106108
) -> AppHandle:
107109
"""
108110
Runs a component.
@@ -136,7 +138,13 @@ def run_component(
136138
ComponentNotFoundException: if the ``component_path`` is failed to resolve.
137139
"""
138140

139-
dryrun_info = self.dryrun_component(component, component_args, scheduler, cfg)
141+
dryrun_info = self.dryrun_component(
142+
component,
143+
component_args,
144+
scheduler,
145+
cfg=cfg,
146+
workspace=workspace,
147+
)
140148
return self.schedule(dryrun_info)
141149

142150
def dryrun_component(
@@ -145,6 +153,7 @@ def dryrun_component(
145153
component_args: List[str],
146154
scheduler: SchedulerBackend,
147155
cfg: Optional[Mapping[str, CfgVal]] = None,
156+
workspace: Optional[str] = None,
148157
) -> AppDryRunInfo:
149158
"""
150159
Dryrun version of :py:func:`run_component`. Will not actually run the
@@ -156,13 +165,14 @@ def dryrun_component(
156165
component_args,
157166
self._component_defaults.get(component, None),
158167
)
159-
return self.dryrun(app, scheduler, cfg)
168+
return self.dryrun(app, scheduler, cfg=cfg, workspace=workspace)
160169

161170
def run(
162171
self,
163172
app: AppDef,
164173
scheduler: SchedulerBackend,
165174
cfg: Optional[Mapping[str, CfgVal]] = None,
175+
workspace: Optional[str] = None,
166176
) -> AppHandle:
167177
"""
168178
Runs the given application in the specified mode.
@@ -174,7 +184,7 @@ def run(
174184
An application handle that is used to call other action APIs on the app.
175185
"""
176186

177-
dryrun_info = self.dryrun(app, scheduler, cfg)
187+
dryrun_info = self.dryrun(app, scheduler, cfg=cfg, workspace=workspace)
178188
return self.schedule(dryrun_info)
179189

180190
def schedule(self, dryrun_info: AppDryRunInfo) -> AppHandle:
@@ -229,6 +239,7 @@ def dryrun(
229239
app: AppDef,
230240
scheduler: SchedulerBackend,
231241
cfg: Optional[Mapping[str, CfgVal]] = None,
242+
workspace: Optional[str] = None,
232243
) -> AppDryRunInfo:
233244
"""
234245
Dry runs an app on the given scheduler with the provided run configs.
@@ -265,6 +276,23 @@ def dryrun(
265276
cfg = cfg or dict()
266277
with log_event("dryrun", scheduler, runcfg=json.dumps(cfg) if cfg else None):
267278
sched = self._scheduler(scheduler)
279+
280+
if workspace and isinstance(sched, Workspace):
281+
role = app.roles[0]
282+
old_img = role.image
283+
logger.info(
284+
f"Building workspace: {workspace} for role[0]: {role.name}, image: {old_img}"
285+
)
286+
sched.build_workspace_and_update_role(role, workspace)
287+
logger.info("Done building workspace")
288+
if old_img != role.image:
289+
logger.info(f"New image: {role.image} built from workspace")
290+
else:
291+
logger.info(
292+
f"Reusing original image: {old_img} for role[0]: {role.name}."
293+
" Either a patch was built or no changes to workspace was detected."
294+
)
295+
268296
sched._validate(app, scheduler)
269297
dryrun_info = sched.submit_dryrun(app, cfg)
270298
dryrun_info._scheduler = scheduler

torchx/runner/test/api_test.py

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,21 @@
1010
import shutil
1111
import tempfile
1212
import unittest
13+
from typing import Mapping, Optional
1314
from unittest.mock import MagicMock, patch
1415

1516
from pyre_extensions import none_throws
1617
from torchx.runner import Runner, get_runner
17-
from torchx.schedulers.api import DescribeAppResponse
18+
from torchx.schedulers.api import DescribeAppResponse, Scheduler
1819
from torchx.schedulers.local_scheduler import (
1920
LocalDirectoryImageProvider,
2021
LocalScheduler,
2122
)
2223
from torchx.schedulers.test.test_util import write_shell_script
23-
from torchx.specs.api import (
24-
AppDef,
25-
AppState,
26-
Resource,
27-
Role,
28-
UnknownAppException,
29-
)
24+
from torchx.specs import AppDryRunInfo, CfgVal
25+
from torchx.specs.api import AppDef, AppState, Resource, Role, UnknownAppException
3026
from torchx.specs.finder import ComponentNotFoundException
27+
from torchx.workspace import Workspace
3128

3229

3330
GET_SCHEDULERS = "torchx.runner.api.get_schedulers"
@@ -136,6 +133,67 @@ def test_dryrun(self, _) -> None:
136133
scheduler_mock.submit_dryrun.assert_called_once_with(app, self.cfg)
137134
scheduler_mock._validate.assert_called_once()
138135

136+
def test_dryrun_with_workspace(self, _) -> None:
137+
class TestScheduler(Scheduler, Workspace):
138+
def __init__(self, build_new_img: bool):
139+
Scheduler.__init__(self, backend="ignored", session_name="ignored")
140+
self.build_new_img = build_new_img
141+
142+
def schedule(self, dryrun_info: AppDryRunInfo) -> str:
143+
pass
144+
145+
def _submit_dryrun(
146+
self, app: AppDef, cfg: Mapping[str, CfgVal]
147+
) -> AppDryRunInfo[AppDef]:
148+
return AppDryRunInfo(app, lambda s: s)
149+
150+
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
151+
pass
152+
153+
def _cancel_existing(self, app_id: str) -> None:
154+
pass
155+
156+
def build_workspace_and_update_role(
157+
self, role: Role, workspace: str
158+
) -> None:
159+
if self.build_new_img:
160+
role.image = f"{role.image}_new"
161+
162+
with Runner(
163+
name=SESSION_NAME,
164+
schedulers={
165+
"no-build-img": TestScheduler(build_new_img=False),
166+
"builds-img": TestScheduler(build_new_img=True),
167+
},
168+
) as runner:
169+
app = AppDef(
170+
"ignored",
171+
roles=[
172+
Role(
173+
name="sleep",
174+
image="foo",
175+
resource=resource.SMALL,
176+
entrypoint="sleep",
177+
args=["1"],
178+
),
179+
Role(
180+
name="sleep",
181+
image="bar",
182+
resource=resource.SMALL,
183+
entrypoint="sleep",
184+
args=["1"],
185+
),
186+
],
187+
)
188+
dryruninfo = runner.dryrun(app, "no-build-img", workspace="//workspace")
189+
self.assertEqual("foo", dryruninfo.request.roles[0].image)
190+
self.assertEqual("bar", dryruninfo.request.roles[1].image)
191+
192+
dryruninfo = runner.dryrun(app, "builds-img", workspace="//workspace")
193+
# workspace is attached to role[0] by default
194+
self.assertEqual("foo_new", dryruninfo.request.roles[0].image)
195+
self.assertEqual("bar", dryruninfo.request.roles[1].image)
196+
139197
def test_describe(self, _) -> None:
140198
with Runner(
141199
name=SESSION_NAME, schedulers={"local_dir": self.scheduler}

0 commit comments

Comments
 (0)