diff --git a/python/ray/utils.py b/python/ray/utils.py index 9fa3a4fe165f..b8e993d5a738 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -38,10 +38,10 @@ def format_error_message(exception_message, task_exception=False): """ lines = exception_message.split("\n") if task_exception: - # For errors that occur inside of tasks, remove lines 1, 2, 3, and 4, - # which are always the same, they just contain information about the - # main loop. - lines = lines[0:1] + lines[5:] + # For errors that occur inside of tasks, remove lines 1 and 2 which are + # always the same, they just contain information about the worker code. + lines = lines[0:1] + lines[3:] + pass return "\n".join(lines) diff --git a/python/ray/worker.py b/python/ray/worker.py index ccc0028f1d23..197c8335deb6 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1744,16 +1744,6 @@ def print_error_messages(worker): # which process the error came from (e.g., a worker or a plasma store). # Currently all error messages come from workers. - helpful_message = """ - You can inspect errors by running - - ray.error_info() - - If this driver is hanging, start a new one with - - ray.init(redis_address="{}") - """.format(worker.redis_address) - worker.error_message_pubsub_client = worker.redis_client.pubsub() # Exports that are published after the call to # error_message_pubsub_client.subscribe and before the call to @@ -1761,6 +1751,12 @@ def print_error_messages(worker): worker.error_message_pubsub_client.subscribe("__keyspace@0__:ErrorKeys") num_errors_received = 0 + # Keep a set of all the error messages that we've seen so far in order to + # avoid printing the same error message repeatedly. This is especially + # important when running a script inside of a tool like screen where + # scrolling is difficult. + old_error_messages = set() + # Get the exports that occurred before the call to subscribe. with worker.lock: error_keys = worker.redis_client.lrange("ErrorKeys", 0, -1) @@ -1768,8 +1764,11 @@ def print_error_messages(worker): if error_applies_to_driver(error_key, worker=worker): error_message = worker.redis_client.hget( error_key, "message").decode("ascii") - print(error_message) - print(helpful_message) + if error_message not in old_error_messages: + print(error_message) + old_error_messages.add(error_message) + else: + print("Suppressing duplicate error message.") num_errors_received += 1 try: @@ -1780,8 +1779,11 @@ def print_error_messages(worker): if error_applies_to_driver(error_key, worker=worker): error_message = worker.redis_client.hget( error_key, "message").decode("ascii") - print(error_message) - print(helpful_message) + if error_message not in old_error_messages: + print(error_message) + old_error_messages.add(error_message) + else: + print("Suppressing duplicate error message.") num_errors_received += 1 except redis.ConnectionError: # When Redis terminates the listen call will throw a ConnectionError, diff --git a/test/failure_test.py b/test/failure_test.py index 43eef645530d..9a55ba34d007 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -64,6 +64,18 @@ def testFailedTask(self): # ray.get should throw an exception. self.assertTrue(False) + @ray.remote + def f(): + raise Exception("This function failed.") + + try: + ray.get(f.remote()) + except Exception as e: + self.assertIn("This function failed.", str(e)) + else: + # ray.get should throw an exception. + self.assertTrue(False) + def testFailImportingRemoteFunction(self): ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)