-
Notifications
You must be signed in to change notification settings - Fork 7
/
ray_trial_executor.py
297 lines (243 loc) · 10.9 KB
/
ray_trial_executor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# coding: utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import os
import time
import traceback
import ray
from ray.tune.logger import NoopLogger
from ray.tune.trial import Trial, Resources, Checkpoint
from ray.tune.trial_executor import TrialExecutor
logger = logging.getLogger(__name__)
class RayTrialExecutor(TrialExecutor):
"""An implemention of TrialExecutor based on Ray."""
def __init__(self, queue_trials=False):
super(RayTrialExecutor, self).__init__(queue_trials)
self._running = {}
# Since trial resume after paused should not run
# trial.train.remote(), thus no more new remote object id generated.
# We use self._paused to store paused trials here.
self._paused = {}
self._avail_resources = Resources(cpu=0, gpu=0)
self._committed_resources = Resources(cpu=0, gpu=0)
self._resources_initialized = False
def _setup_runner(self, trial):
cls = ray.remote(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu)(trial._get_trainable_cls())
trial.init_logger()
remote_logdir = trial.logdir
def logger_creator(config):
# Set the working dir in the remote process, for user file writes
if not os.path.exists(remote_logdir):
os.makedirs(remote_logdir)
os.chdir(remote_logdir)
return NoopLogger(config, remote_logdir)
# Logging for trials is handled centrally by TrialRunner, so
# configure the remote runner to use a noop-logger.
return cls.remote(config=trial.config, logger_creator=logger_creator)
def _train(self, trial):
"""Start one iteration of training and save remote id."""
assert trial.status == Trial.RUNNING, trial.status
remote = trial.runner.train.remote()
self._running[remote] = trial
def _start_trial(self, trial, checkpoint=None):
prior_status = trial.status
trial.status = Trial.RUNNING
trial.runner = self._setup_runner(trial)
if not self.restore(trial, checkpoint):
return
previous_run = self._find_item(self._paused, trial)
if (prior_status == Trial.PAUSED and previous_run):
# If Trial was in flight when paused, self._paused stores result.
self._paused.pop(previous_run[0])
self._running[previous_run[0]] = trial
else:
self._train(trial)
def _stop_trial(self, trial, error=False, error_msg=None,
stop_logger=True):
"""Stops this trial.
Stops this trial, releasing all allocating resources. If stopping the
trial fails, the run will be marked as terminated in error, but no
exception will be thrown.
Args:
error (bool): Whether to mark this trial as terminated in error.
error_msg (str): Optional error message.
stop_logger (bool): Whether to shut down the trial logger.
"""
if error:
trial.status = Trial.ERROR
else:
trial.status = Trial.TERMINATED
try:
trial.write_error_log(error_msg)
if hasattr(trial, 'runner') and trial.runner:
stop_tasks = []
stop_tasks.append(trial.runner.stop.remote())
stop_tasks.append(trial.runner.__ray_terminate__.remote())
# TODO(ekl) seems like wait hangs when killing actors
_, unfinished = ray.wait(
stop_tasks, num_returns=2, timeout=250)
except Exception:
logger.exception("Error stopping runner.")
trial.status = Trial.ERROR
finally:
trial.runner = None
if stop_logger:
trial.close_logger()
def start_trial(self, trial, checkpoint_obj=None):
"""Starts the trial."""
self._commit_resources(trial.resources)
try:
self._start_trial(trial, checkpoint_obj)
except Exception:
logger.exception("Error stopping runner - retrying...")
error_msg = traceback.format_exc()
time.sleep(2)
self._stop_trial(trial, error=True, error_msg=error_msg)
try:
self._start_trial(trial)
except Exception:
logger.exception("Error starting runner, aborting!")
error_msg = traceback.format_exc()
self._stop_trial(trial, error=True, error_msg=error_msg)
# note that we don't return the resources, since they may
# have been lost
def _find_item(self, dictionary, item):
out = [rid for rid, t in dictionary.items() if t is item]
return out
def stop_trial(self, trial, error=False, error_msg=None, stop_logger=True):
"""Only returns resources if resources allocated."""
prior_status = trial.status
self._stop_trial(
trial, error=error, error_msg=error_msg, stop_logger=stop_logger)
if prior_status == Trial.RUNNING:
self._return_resources(trial.resources)
out = self._find_item(self._running, trial)
for result_id in out:
self._running.pop(result_id)
def continue_training(self, trial):
"""Continues the training of this trial."""
self._train(trial)
def pause_trial(self, trial):
"""Pauses the trial.
If trial is in-flight, preserves return value in separate queue
before pausing, which is restored when Trial is resumed.
"""
trial_future = self._find_item(self._running, trial)
if trial_future:
self._paused[trial_future[0]] = trial
super(RayTrialExecutor, self).pause_trial(trial)
def reset_trial(self, trial, new_config, new_experiment_tag):
"""Tries to invoke `Trainable.reset_config()` to reset trial.
Args:
trial (Trial): Trial to be reset.
new_config (dict): New configuration for Trial
trainable.
new_experiment_tag (str): New experiment name
for trial.
Returns:
True if `reset_config` is successful else False.
"""
trial.experiment_tag = new_experiment_tag
trial.config = new_config
trainable = trial.runner
reset_val = ray.get(trainable.reset_config.remote(new_config))
return reset_val
def get_running_trials(self):
"""Returns the running trials."""
return list(self._running.values())
def get_next_available_trial(self):
[result_id], _ = ray.wait(list(self._running))
return self._running[result_id]
def fetch_result(self, trial):
"""Fetches one result of the running trials.
Returns:
Result of the most recent trial training run."""
trial_future = self._find_item(self._running, trial)
if not trial_future:
raise ValueError("Trial was not running.")
self._running.pop(trial_future[0])
result = ray.get(trial_future[0])
return result
def _commit_resources(self, resources):
self._committed_resources = Resources(
self._committed_resources.cpu + resources.cpu_total(),
self._committed_resources.gpu + resources.gpu_total())
def _return_resources(self, resources):
self._committed_resources = Resources(
self._committed_resources.cpu - resources.cpu_total(),
self._committed_resources.gpu - resources.gpu_total())
assert self._committed_resources.cpu >= 0
assert self._committed_resources.gpu >= 0
def _update_avail_resources(self):
resources = ray.global_state.cluster_resources()
num_cpus = resources["CPU"]
num_gpus = resources["GPU"]
self._avail_resources = Resources(int(num_cpus), int(num_gpus))
self._resources_initialized = True
def has_resources(self, resources):
"""Returns whether this runner has at least the specified resources."""
cpu_avail = self._avail_resources.cpu - self._committed_resources.cpu
gpu_avail = self._avail_resources.gpu - self._committed_resources.gpu
have_space = (resources.cpu_total() <= cpu_avail
and resources.gpu_total() <= gpu_avail)
if have_space:
return True
can_overcommit = self._queue_trials
if (resources.cpu_total() > 0 and cpu_avail <= 0) or \
(resources.gpu_total() > 0 and gpu_avail <= 0):
can_overcommit = False # requested resource is already saturated
if can_overcommit:
logger.warning(
"Allowing trial to start even though the "
"cluster does not have enough free resources. Trial actors "
"may appear to hang until enough resources are added to the "
"cluster (e.g., via autoscaling). You can disable this "
"behavior by specifying `queue_trials=False` in "
"ray.tune.run_experiments().")
return True
return False
def debug_string(self):
"""Returns a human readable message for printing to the console."""
if self._resources_initialized:
return "Resources requested: {}/{} CPUs, {}/{} GPUs".format(
self._committed_resources.cpu, self._avail_resources.cpu,
self._committed_resources.gpu, self._avail_resources.gpu)
else:
return ""
def on_step_begin(self):
"""Before step() called, update the available resources."""
self._update_avail_resources()
def save(self, trial, storage=Checkpoint.DISK):
"""Saves the trial's state to a checkpoint."""
trial._checkpoint.storage = storage
if storage == Checkpoint.MEMORY:
trial._checkpoint.value = trial.runner.save_to_object.remote()
else:
trial._checkpoint.value = ray.get(trial.runner.save.remote())
return trial._checkpoint.value
def restore(self, trial, checkpoint=None):
"""Restores training state from a given model checkpoint."""
if checkpoint is None or checkpoint.value is None:
checkpoint = trial._checkpoint
if checkpoint is None or checkpoint.value is None:
return True
if trial.runner is None:
logger.error("Unable to restore - no runner.")
trial.status = Trial.ERROR
return False
try:
value = checkpoint.value
if checkpoint.storage == Checkpoint.MEMORY:
assert type(value) != Checkpoint, type(value)
ray.get(trial.runner.restore_from_object.remote(value))
else:
ray.get(trial.runner.restore.remote(value))
return True
except Exception:
logger.exception("Error restoring runner.")
trial.status = Trial.ERROR
return False