Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/ray/autoscaler/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ def run(self):
self.provider.set_node_tags(self.node_id,
{TAG_RAY_NODE_STATUS: "UpdateFailed"})
if self.logfile is not None:
print("----- BEGIN REMOTE LOGS -----\n" + open(
self.logfile.name).read() + "\n----- END REMOTE LOGS -----"
)
print("----- BEGIN REMOTE LOGS -----\n" +
open(self.logfile.name).read() +
"\n----- END REMOTE LOGS -----")
raise e
self.provider.set_node_tags(
self.node_id, {
Expand Down
17 changes: 8 additions & 9 deletions python/ray/experimental/array/distributed/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,9 @@ def tsqr(a):
lower = [a.shape[1], 0]
upper = [2 * a.shape[1], core.BLOCK_SIZE]
ith_index //= 2
q_block_current = ra.dot.remote(q_block_current,
ra.subarray.remote(
q_tree[ith_index, j], lower,
upper))
q_block_current = ra.dot.remote(
q_block_current,
ra.subarray.remote(q_tree[ith_index, j], lower, upper))
q_result.objectids[i] = q_block_current
r = current_rs[0]
return q_result, ray.get(r)
Expand Down Expand Up @@ -222,10 +221,10 @@ def qr(a):
y_col_block = core.subblocks.remote(y_res, [], [i])
q = core.subtract.remote(
q,
core.dot.remote(y_col_block,
core.dot.remote(
Ts[i],
core.dot.remote(
core.transpose.remote(y_col_block), q))))
core.dot.remote(
y_col_block,
core.dot.remote(
Ts[i],
core.dot.remote(core.transpose.remote(y_col_block), q))))

return ray.get(q), r_res
8 changes: 4 additions & 4 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,8 @@ def micros_rel(ts):
"name":
"SubmitTask",
"args": {},
"id": (parent_info["worker_id"] +
str(micros(min(parent_times))))
"id": (parent_info["worker_id"] + str(
micros(min(parent_times))))
}
full_trace.append(parent)

Expand Down Expand Up @@ -824,8 +824,8 @@ def micros_rel(ts):
"name":
"SubmitTask",
"args": {},
"id": (parent_info["worker_id"] +
str(micros(min(parent_times))))
"id": (parent_info["worker_id"] + str(
micros(min(parent_times))))
}
full_trace.append(parent)

Expand Down
12 changes: 6 additions & 6 deletions python/ray/experimental/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ def task_completion_time_update(abs_earliest, abs_latest, abs_num_tasks,
# Create the distribution to plot
distr = []
for task_id, data in tasks.items():
distr.append(
data["store_outputs_end"] - data["get_arguments_start"])
distr.append(data["store_outputs_end"] -
data["get_arguments_start"])

# Create a histogram from the distribution
top, bin_edges = np.histogram(distr, bins="auto")
Expand Down Expand Up @@ -520,10 +520,10 @@ def compute_utilizations(abs_earliest,
# Walk over each time bucket that this task intersects, adding the
# amount of time that the task intersects within each bucket
for bucket_idx in range(start_bucket, end_bucket + 1):
bucket_start_time = ((
earliest_time + bucket_idx) * bucket_time_length)
bucket_end_time = ((earliest_time +
(bucket_idx + 1)) * bucket_time_length)
bucket_start_time = (
(earliest_time + bucket_idx) * bucket_time_length)
bucket_end_time = (
(earliest_time + (bucket_idx + 1)) * bucket_time_length)

task_start_time_within_bucket = max(task_start_time,
bucket_start_time)
Expand Down
5 changes: 3 additions & 2 deletions python/ray/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ def __init__(self, redis_ip_address, redis_port, node_ip_address):
def update_log_filenames(self):
"""Get the most up-to-date list of log files to monitor from Redis."""
num_current_log_files = len(self.log_files)
new_log_filenames = self.redis_client.lrange("LOG_FILENAMES:{}".format(
self.node_ip_address), num_current_log_files, -1)
new_log_filenames = self.redis_client.lrange(
"LOG_FILENAMES:{}".format(self.node_ip_address),
num_current_log_files, -1)
for log_filename in new_log_filenames:
print("Beginning to track file {}".format(log_filename))
assert log_filename not in self.log_files
Expand Down
11 changes: 5 additions & 6 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,9 @@ def cleanup_object_table(self):
if manager in self.dead_plasma_managers:
# If the object was on a dead plasma manager, remove that
# location entry.
ok = self.state._execute_command(object_id,
"RAY.OBJECT_TABLE_REMOVE",
object_id.id(),
hex_to_binary(manager))
ok = self.state._execute_command(
object_id, "RAY.OBJECT_TABLE_REMOVE", object_id.id(),
hex_to_binary(manager))
if ok != b"OK":
log.warn("Failed to remove object location for dead "
"plasma manager.")
Expand Down Expand Up @@ -507,8 +506,8 @@ def run(self):
log.debug("{} dead local schedulers, {} plasma managers total, {} "
"dead plasma managers".format(
len(self.dead_local_schedulers),
(len(self.live_plasma_managers) +
len(self.dead_plasma_managers)),
(len(self.live_plasma_managers) + len(
self.dead_plasma_managers)),
len(self.dead_plasma_managers)))

# Handle messages from the subscription channels.
Expand Down
6 changes: 3 additions & 3 deletions python/ray/tune/hyperband.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,9 @@ def _calculate_total_work(self, n, r, s):

def __repr__(self):
status = ", ".join([
"Max Size (n)={}".format(self._n), "Milestone (r)={}".format(
self._cumul_r), "completed={:.1%}".format(
self.completion_percentage())
"Max Size (n)={}".format(self._n),
"Milestone (r)={}".format(self._cumul_r),
"completed={:.1%}".format(self.completion_percentage())
])
counts = collections.Counter([t.status for t in self._all_trials])
trial_statuses = ", ".join(
Expand Down
40 changes: 22 additions & 18 deletions python/ray/tune/test/trial_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,14 @@ def testTrialErrored(self):
mock_runner._launch_trial(t)

sched.on_trial_error(mock_runner, t3)
self.assertEqual(TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t1,
result(stats[str(1)]["r"], 10)))
self.assertEqual(TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t2,
result(stats[str(1)]["r"], 10)))
self.assertEqual(
TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t1,
result(stats[str(1)]["r"], 10)))
self.assertEqual(
TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t2,
result(stats[str(1)]["r"], 10)))

def testTrialErrored2(self):
"""Check successive halving happened even when last trial failed"""
Expand Down Expand Up @@ -405,12 +407,14 @@ def testTrialEndedEarly(self):
mock_runner._launch_trial(t)

sched.on_trial_complete(mock_runner, t3, result(1, 12))
self.assertEqual(TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t1,
result(stats[str(1)]["r"], 10)))
self.assertEqual(TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t2,
result(stats[str(1)]["r"], 10)))
self.assertEqual(
TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t1,
result(stats[str(1)]["r"], 10)))
self.assertEqual(
TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t2,
result(stats[str(1)]["r"], 10)))

def testTrialEndedEarly2(self):
"""Check successive halving happened even when last trial failed"""
Expand Down Expand Up @@ -449,13 +453,13 @@ def testAddAfterHalving(self):
self.assertEqual(len(sched._state["bracket"].current_trials()), 2)

# Make sure that newly added trial gets fair computation (not just 1)
self.assertEqual(TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t,
result(init_units, 12)))
self.assertEqual(
TrialScheduler.CONTINUE,
sched.on_trial_result(mock_runner, t, result(init_units, 12)))
new_units = init_units + int(init_units * sched._eta)
self.assertEqual(TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t,
result(new_units, 12)))
self.assertEqual(
TrialScheduler.PAUSE,
sched.on_trial_result(mock_runner, t, result(new_units, 12)))

def testAlternateMetrics(self):
"""Checking that alternate metrics will pass."""
Expand Down
17 changes: 10 additions & 7 deletions python/ray/tune/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ def stop(self, error=False, error_msg=None, stop_logger=True):
try:
if error_msg and self.logdir:
self.num_failures += 1
error_file = os.path.join(self.logdir, "error_{}.txt".format(
date_str()))
error_file = os.path.join(self.logdir,
"error_{}.txt".format(date_str()))
with open(error_file, "w") as f:
f.write(error_msg)
self.error_file = error_file
Expand Down Expand Up @@ -259,9 +259,10 @@ def location_string(hostname, pid):
return '{} pid={}'.format(hostname, pid)

pieces = [
'{} [{}]'.format(self._status_string(),
location_string(self.last_result.hostname,
self.last_result.pid)),
'{} [{}]'.format(
self._status_string(),
location_string(self.last_result.hostname,
self.last_result.pid)),
'{} s'.format(int(self.last_result.time_total_s)), '{} ts'.format(
int(self.last_result.timesteps_total))
]
Expand All @@ -281,8 +282,10 @@ def location_string(hostname, pid):
return ', '.join(pieces)

def _status_string(self):
return "{}{}".format(self.status, ", {} failures: {}".format(
self.num_failures, self.error_file) if self.error_file else "")
return "{}{}".format(
self.status, ", {} failures: {}".format(self.num_failures,
self.error_file)
if self.error_file else "")

def has_checkpoint(self):
return self._checkpoint_path is not None or \
Expand Down
14 changes: 6 additions & 8 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,10 +845,9 @@ def _process_task(self, task):
e, None)
return
except Exception as e:
self._handle_process_task_failure(function_id, return_object_ids,
e,
ray.utils.format_error_message(
traceback.format_exc()))
self._handle_process_task_failure(
function_id, return_object_ids, e,
ray.utils.format_error_message(traceback.format_exc()))
return

# Execute the task.
Expand Down Expand Up @@ -881,10 +880,9 @@ def _process_task(self, task):
outputs = (outputs, )
self._store_outputs_in_objstore(return_object_ids, outputs)
except Exception as e:
self._handle_process_task_failure(function_id, return_object_ids,
e,
ray.utils.format_error_message(
traceback.format_exc()))
self._handle_process_task_failure(
function_id, return_object_ids, e,
ray.utils.format_error_message(traceback.format_exc()))

def _handle_process_task_failure(self, function_id, return_object_ids,
error, backtrace):
Expand Down
17 changes: 9 additions & 8 deletions test/actor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1038,14 +1038,14 @@ def get_location_and_ids(self):
def locations_to_intervals_for_many_tasks():
# Launch a bunch of GPU tasks.
locations_ids_and_intervals = ray.get([
f1.remote() for _ in range(
5 * num_local_schedulers * num_gpus_per_scheduler)
f1.remote() for _ in range(5 * num_local_schedulers *
num_gpus_per_scheduler)
] + [
f2.remote() for _ in range(
5 * num_local_schedulers * num_gpus_per_scheduler)
f2.remote() for _ in range(5 * num_local_schedulers *
num_gpus_per_scheduler)
] + [
f1.remote() for _ in range(
5 * num_local_schedulers * num_gpus_per_scheduler)
f1.remote() for _ in range(5 * num_local_schedulers *
num_gpus_per_scheduler)
])

locations_to_intervals = collections.defaultdict(lambda: [])
Expand Down Expand Up @@ -1108,8 +1108,9 @@ def locations_to_intervals_for_many_tasks():

# Create more actors to fill up all the GPUs.
more_actors = [
Actor1.remote() for _ in range(
num_local_schedulers * num_gpus_per_scheduler - 1 - 3)
Actor1.remote()
for _ in range(num_local_schedulers * num_gpus_per_scheduler - 1 -
3)
]
# Wait for the actors to finish being created.
ray.get([actor.get_location_and_ids.remote() for actor in more_actors])
Expand Down
11 changes: 6 additions & 5 deletions test/array_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ def testAssemble(self):
b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
x = da.DistArray([2 * da.BLOCK_SIZE, da.BLOCK_SIZE],
np.array([[a], [b]]))
assert_equal(x.assemble(),
np.vstack([
np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]),
np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE])
]))
assert_equal(
x.assemble(),
np.vstack([
np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]),
np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE])
]))

def testMethods(self):
for module in [
Expand Down
5 changes: 2 additions & 3 deletions test/failure_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,8 @@ def put_arg_task():
# on the one before it. The result of the first task should get
# evicted.
args = []
arg = single_dependency.remote(0,
np.zeros(
object_size, dtype=np.uint8))
arg = single_dependency.remote(
0, np.zeros(object_size, dtype=np.uint8))
for i in range(num_objects):
arg = single_dependency.remote(i, arg)
args.append(arg)
Expand Down
10 changes: 4 additions & 6 deletions test/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,10 @@ class CustomError(Exception):
TUPLE_OBJECTS = [(obj, ) for obj in BASE_OBJECTS]
# The check that type(obj).__module__ != "numpy" should be unnecessary, but
# otherwise this seems to fail on Mac OS X on Travis.
DICT_OBJECTS = (
[{
obj: obj
} for obj in PRIMITIVE_OBJECTS
if (obj.__hash__ is not None and type(obj).__module__ != "numpy")] +
[{
DICT_OBJECTS = ([{
obj: obj
} for obj in PRIMITIVE_OBJECTS if (
obj.__hash__ is not None and type(obj).__module__ != "numpy")] + [{
0: obj
} for obj in BASE_OBJECTS] + [{
Foo(123): Foo(456)
Expand Down