diff --git a/python/ray/autoscaler/updater.py b/python/ray/autoscaler/updater.py index af13f0fecf7b..8d518f8ab7b4 100644 --- a/python/ray/autoscaler/updater.py +++ b/python/ray/autoscaler/updater.py @@ -75,9 +75,9 @@ def run(self): self.provider.set_node_tags(self.node_id, {TAG_RAY_NODE_STATUS: "UpdateFailed"}) if self.logfile is not None: - print("----- BEGIN REMOTE LOGS -----\n" + open( - self.logfile.name).read() + "\n----- END REMOTE LOGS -----" - ) + print("----- BEGIN REMOTE LOGS -----\n" + + open(self.logfile.name).read() + + "\n----- END REMOTE LOGS -----") raise e self.provider.set_node_tags( self.node_id, { diff --git a/python/ray/experimental/array/distributed/linalg.py b/python/ray/experimental/array/distributed/linalg.py index 7f473715d022..8bf45fe6bb12 100644 --- a/python/ray/experimental/array/distributed/linalg.py +++ b/python/ray/experimental/array/distributed/linalg.py @@ -76,10 +76,9 @@ def tsqr(a): lower = [a.shape[1], 0] upper = [2 * a.shape[1], core.BLOCK_SIZE] ith_index //= 2 - q_block_current = ra.dot.remote(q_block_current, - ra.subarray.remote( - q_tree[ith_index, j], lower, - upper)) + q_block_current = ra.dot.remote( + q_block_current, + ra.subarray.remote(q_tree[ith_index, j], lower, upper)) q_result.objectids[i] = q_block_current r = current_rs[0] return q_result, ray.get(r) @@ -222,10 +221,10 @@ def qr(a): y_col_block = core.subblocks.remote(y_res, [], [i]) q = core.subtract.remote( q, - core.dot.remote(y_col_block, - core.dot.remote( - Ts[i], - core.dot.remote( - core.transpose.remote(y_col_block), q)))) + core.dot.remote( + y_col_block, + core.dot.remote( + Ts[i], + core.dot.remote(core.transpose.remote(y_col_block), q)))) return ray.get(q), r_res diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 0cea70b9ecee..3ded77401b20 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -749,8 +749,8 @@ def micros_rel(ts): "name": "SubmitTask", "args": {}, - "id": (parent_info["worker_id"] + - str(micros(min(parent_times)))) + "id": (parent_info["worker_id"] + str( + micros(min(parent_times)))) } full_trace.append(parent) @@ -824,8 +824,8 @@ def micros_rel(ts): "name": "SubmitTask", "args": {}, - "id": (parent_info["worker_id"] + - str(micros(min(parent_times)))) + "id": (parent_info["worker_id"] + str( + micros(min(parent_times)))) } full_trace.append(parent) diff --git a/python/ray/experimental/ui.py b/python/ray/experimental/ui.py index adae70692265..1e6996cd303f 100644 --- a/python/ray/experimental/ui.py +++ b/python/ray/experimental/ui.py @@ -451,8 +451,8 @@ def task_completion_time_update(abs_earliest, abs_latest, abs_num_tasks, # Create the distribution to plot distr = [] for task_id, data in tasks.items(): - distr.append( - data["store_outputs_end"] - data["get_arguments_start"]) + distr.append(data["store_outputs_end"] - + data["get_arguments_start"]) # Create a histogram from the distribution top, bin_edges = np.histogram(distr, bins="auto") @@ -520,10 +520,10 @@ def compute_utilizations(abs_earliest, # Walk over each time bucket that this task intersects, adding the # amount of time that the task intersects within each bucket for bucket_idx in range(start_bucket, end_bucket + 1): - bucket_start_time = (( - earliest_time + bucket_idx) * bucket_time_length) - bucket_end_time = ((earliest_time + - (bucket_idx + 1)) * bucket_time_length) + bucket_start_time = ( + (earliest_time + bucket_idx) * bucket_time_length) + bucket_end_time = ( + (earliest_time + (bucket_idx + 1)) * bucket_time_length) task_start_time_within_bucket = max(task_start_time, bucket_start_time) diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index 465d87ef3e31..b0f268cd4a8a 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -39,8 +39,9 @@ def __init__(self, redis_ip_address, redis_port, node_ip_address): def update_log_filenames(self): """Get the most up-to-date list of log files to monitor from Redis.""" num_current_log_files = len(self.log_files) - new_log_filenames = self.redis_client.lrange("LOG_FILENAMES:{}".format( - self.node_ip_address), num_current_log_files, -1) + new_log_filenames = self.redis_client.lrange( + "LOG_FILENAMES:{}".format(self.node_ip_address), + num_current_log_files, -1) for log_filename in new_log_filenames: print("Beginning to track file {}".format(log_filename)) assert log_filename not in self.log_files diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 20c5ce17b4cb..734daf364258 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -189,10 +189,9 @@ def cleanup_object_table(self): if manager in self.dead_plasma_managers: # If the object was on a dead plasma manager, remove that # location entry. - ok = self.state._execute_command(object_id, - "RAY.OBJECT_TABLE_REMOVE", - object_id.id(), - hex_to_binary(manager)) + ok = self.state._execute_command( + object_id, "RAY.OBJECT_TABLE_REMOVE", object_id.id(), + hex_to_binary(manager)) if ok != b"OK": log.warn("Failed to remove object location for dead " "plasma manager.") @@ -507,8 +506,8 @@ def run(self): log.debug("{} dead local schedulers, {} plasma managers total, {} " "dead plasma managers".format( len(self.dead_local_schedulers), - (len(self.live_plasma_managers) + - len(self.dead_plasma_managers)), + (len(self.live_plasma_managers) + len( + self.dead_plasma_managers)), len(self.dead_plasma_managers))) # Handle messages from the subscription channels. diff --git a/python/ray/tune/hyperband.py b/python/ray/tune/hyperband.py index bbc4fa0077bc..45a716a3f982 100644 --- a/python/ray/tune/hyperband.py +++ b/python/ray/tune/hyperband.py @@ -391,9 +391,9 @@ def _calculate_total_work(self, n, r, s): def __repr__(self): status = ", ".join([ - "Max Size (n)={}".format(self._n), "Milestone (r)={}".format( - self._cumul_r), "completed={:.1%}".format( - self.completion_percentage()) + "Max Size (n)={}".format(self._n), + "Milestone (r)={}".format(self._cumul_r), + "completed={:.1%}".format(self.completion_percentage()) ]) counts = collections.Counter([t.status for t in self._all_trials]) trial_statuses = ", ".join( diff --git a/python/ray/tune/test/trial_scheduler_test.py b/python/ray/tune/test/trial_scheduler_test.py index a15448db79c9..97669626f8f7 100644 --- a/python/ray/tune/test/trial_scheduler_test.py +++ b/python/ray/tune/test/trial_scheduler_test.py @@ -370,12 +370,14 @@ def testTrialErrored(self): mock_runner._launch_trial(t) sched.on_trial_error(mock_runner, t3) - self.assertEqual(TrialScheduler.PAUSE, - sched.on_trial_result(mock_runner, t1, - result(stats[str(1)]["r"], 10))) - self.assertEqual(TrialScheduler.CONTINUE, - sched.on_trial_result(mock_runner, t2, - result(stats[str(1)]["r"], 10))) + self.assertEqual( + TrialScheduler.PAUSE, + sched.on_trial_result(mock_runner, t1, + result(stats[str(1)]["r"], 10))) + self.assertEqual( + TrialScheduler.CONTINUE, + sched.on_trial_result(mock_runner, t2, + result(stats[str(1)]["r"], 10))) def testTrialErrored2(self): """Check successive halving happened even when last trial failed""" @@ -405,12 +407,14 @@ def testTrialEndedEarly(self): mock_runner._launch_trial(t) sched.on_trial_complete(mock_runner, t3, result(1, 12)) - self.assertEqual(TrialScheduler.PAUSE, - sched.on_trial_result(mock_runner, t1, - result(stats[str(1)]["r"], 10))) - self.assertEqual(TrialScheduler.CONTINUE, - sched.on_trial_result(mock_runner, t2, - result(stats[str(1)]["r"], 10))) + self.assertEqual( + TrialScheduler.PAUSE, + sched.on_trial_result(mock_runner, t1, + result(stats[str(1)]["r"], 10))) + self.assertEqual( + TrialScheduler.CONTINUE, + sched.on_trial_result(mock_runner, t2, + result(stats[str(1)]["r"], 10))) def testTrialEndedEarly2(self): """Check successive halving happened even when last trial failed""" @@ -449,13 +453,13 @@ def testAddAfterHalving(self): self.assertEqual(len(sched._state["bracket"].current_trials()), 2) # Make sure that newly added trial gets fair computation (not just 1) - self.assertEqual(TrialScheduler.CONTINUE, - sched.on_trial_result(mock_runner, t, - result(init_units, 12))) + self.assertEqual( + TrialScheduler.CONTINUE, + sched.on_trial_result(mock_runner, t, result(init_units, 12))) new_units = init_units + int(init_units * sched._eta) - self.assertEqual(TrialScheduler.PAUSE, - sched.on_trial_result(mock_runner, t, - result(new_units, 12))) + self.assertEqual( + TrialScheduler.PAUSE, + sched.on_trial_result(mock_runner, t, result(new_units, 12))) def testAlternateMetrics(self): """Checking that alternate metrics will pass.""" diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 21e8907e1aa6..7d7442572e6a 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -174,8 +174,8 @@ def stop(self, error=False, error_msg=None, stop_logger=True): try: if error_msg and self.logdir: self.num_failures += 1 - error_file = os.path.join(self.logdir, "error_{}.txt".format( - date_str())) + error_file = os.path.join(self.logdir, + "error_{}.txt".format(date_str())) with open(error_file, "w") as f: f.write(error_msg) self.error_file = error_file @@ -259,9 +259,10 @@ def location_string(hostname, pid): return '{} pid={}'.format(hostname, pid) pieces = [ - '{} [{}]'.format(self._status_string(), - location_string(self.last_result.hostname, - self.last_result.pid)), + '{} [{}]'.format( + self._status_string(), + location_string(self.last_result.hostname, + self.last_result.pid)), '{} s'.format(int(self.last_result.time_total_s)), '{} ts'.format( int(self.last_result.timesteps_total)) ] @@ -281,8 +282,10 @@ def location_string(hostname, pid): return ', '.join(pieces) def _status_string(self): - return "{}{}".format(self.status, ", {} failures: {}".format( - self.num_failures, self.error_file) if self.error_file else "") + return "{}{}".format( + self.status, ", {} failures: {}".format(self.num_failures, + self.error_file) + if self.error_file else "") def has_checkpoint(self): return self._checkpoint_path is not None or \ diff --git a/python/ray/worker.py b/python/ray/worker.py index 3b4495d25cfd..ccc0028f1d23 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -845,10 +845,9 @@ def _process_task(self, task): e, None) return except Exception as e: - self._handle_process_task_failure(function_id, return_object_ids, - e, - ray.utils.format_error_message( - traceback.format_exc())) + self._handle_process_task_failure( + function_id, return_object_ids, e, + ray.utils.format_error_message(traceback.format_exc())) return # Execute the task. @@ -881,10 +880,9 @@ def _process_task(self, task): outputs = (outputs, ) self._store_outputs_in_objstore(return_object_ids, outputs) except Exception as e: - self._handle_process_task_failure(function_id, return_object_ids, - e, - ray.utils.format_error_message( - traceback.format_exc())) + self._handle_process_task_failure( + function_id, return_object_ids, e, + ray.utils.format_error_message(traceback.format_exc())) def _handle_process_task_failure(self, function_id, return_object_ids, error, backtrace): diff --git a/test/actor_test.py b/test/actor_test.py index 1d1c5ed18b20..51e37e8466b5 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1038,14 +1038,14 @@ def get_location_and_ids(self): def locations_to_intervals_for_many_tasks(): # Launch a bunch of GPU tasks. locations_ids_and_intervals = ray.get([ - f1.remote() for _ in range( - 5 * num_local_schedulers * num_gpus_per_scheduler) + f1.remote() for _ in range(5 * num_local_schedulers * + num_gpus_per_scheduler) ] + [ - f2.remote() for _ in range( - 5 * num_local_schedulers * num_gpus_per_scheduler) + f2.remote() for _ in range(5 * num_local_schedulers * + num_gpus_per_scheduler) ] + [ - f1.remote() for _ in range( - 5 * num_local_schedulers * num_gpus_per_scheduler) + f1.remote() for _ in range(5 * num_local_schedulers * + num_gpus_per_scheduler) ]) locations_to_intervals = collections.defaultdict(lambda: []) @@ -1108,8 +1108,9 @@ def locations_to_intervals_for_many_tasks(): # Create more actors to fill up all the GPUs. more_actors = [ - Actor1.remote() for _ in range( - num_local_schedulers * num_gpus_per_scheduler - 1 - 3) + Actor1.remote() + for _ in range(num_local_schedulers * num_gpus_per_scheduler - 1 - + 3) ] # Wait for the actors to finish being created. ray.get([actor.get_location_and_ids.remote() for actor in more_actors]) diff --git a/test/array_test.py b/test/array_test.py index 567ac81152c6..04ae66d37258 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -67,11 +67,12 @@ def testAssemble(self): b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE]) x = da.DistArray([2 * da.BLOCK_SIZE, da.BLOCK_SIZE], np.array([[a], [b]])) - assert_equal(x.assemble(), - np.vstack([ - np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]), - np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE]) - ])) + assert_equal( + x.assemble(), + np.vstack([ + np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]), + np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE]) + ])) def testMethods(self): for module in [ diff --git a/test/failure_test.py b/test/failure_test.py index b3260a1744e2..43eef645530d 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -387,9 +387,8 @@ def put_arg_task(): # on the one before it. The result of the first task should get # evicted. args = [] - arg = single_dependency.remote(0, - np.zeros( - object_size, dtype=np.uint8)) + arg = single_dependency.remote( + 0, np.zeros(object_size, dtype=np.uint8)) for i in range(num_objects): arg = single_dependency.remote(i, arg) args.append(arg) diff --git a/test/runtest.py b/test/runtest.py index ab24f5ea1256..6b7d4bdc4e9e 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -180,12 +180,10 @@ class CustomError(Exception): TUPLE_OBJECTS = [(obj, ) for obj in BASE_OBJECTS] # The check that type(obj).__module__ != "numpy" should be unnecessary, but # otherwise this seems to fail on Mac OS X on Travis. -DICT_OBJECTS = ( - [{ - obj: obj - } for obj in PRIMITIVE_OBJECTS - if (obj.__hash__ is not None and type(obj).__module__ != "numpy")] + - [{ +DICT_OBJECTS = ([{ + obj: obj +} for obj in PRIMITIVE_OBJECTS if ( + obj.__hash__ is not None and type(obj).__module__ != "numpy")] + [{ 0: obj } for obj in BASE_OBJECTS] + [{ Foo(123): Foo(456)