Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions python/ray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ def format_error_message(exception_message, task_exception=False):
"""
lines = exception_message.split("\n")
if task_exception:
# For errors that occur inside of tasks, remove lines 1, 2, 3, and 4,
# which are always the same, they just contain information about the
# main loop.
lines = lines[0:1] + lines[5:]
# For errors that occur inside of tasks, remove lines 1 and 2 which are
# always the same, they just contain information about the worker code.
lines = lines[0:1] + lines[3:]
pass
return "\n".join(lines)


Expand Down
30 changes: 16 additions & 14 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1744,32 +1744,31 @@ def print_error_messages(worker):
# which process the error came from (e.g., a worker or a plasma store).
# Currently all error messages come from workers.

helpful_message = """
You can inspect errors by running

ray.error_info()

If this driver is hanging, start a new one with

ray.init(redis_address="{}")
""".format(worker.redis_address)

worker.error_message_pubsub_client = worker.redis_client.pubsub()
# Exports that are published after the call to
# error_message_pubsub_client.subscribe and before the call to
# error_message_pubsub_client.listen will still be processed in the loop.
worker.error_message_pubsub_client.subscribe("__keyspace@0__:ErrorKeys")
num_errors_received = 0

# Keep a set of all the error messages that we've seen so far in order to
# avoid printing the same error message repeatedly. This is especially
# important when running a script inside of a tool like screen where
# scrolling is difficult.
old_error_messages = set()

# Get the exports that occurred before the call to subscribe.
with worker.lock:
error_keys = worker.redis_client.lrange("ErrorKeys", 0, -1)
for error_key in error_keys:
if error_applies_to_driver(error_key, worker=worker):
error_message = worker.redis_client.hget(
error_key, "message").decode("ascii")
print(error_message)
print(helpful_message)
if error_message not in old_error_messages:
print(error_message)
old_error_messages.add(error_message)
else:
print("Suppressing duplicate error message.")
num_errors_received += 1

try:
Expand All @@ -1780,8 +1779,11 @@ def print_error_messages(worker):
if error_applies_to_driver(error_key, worker=worker):
error_message = worker.redis_client.hget(
error_key, "message").decode("ascii")
print(error_message)
print(helpful_message)
if error_message not in old_error_messages:
print(error_message)
old_error_messages.add(error_message)
else:
print("Suppressing duplicate error message.")
num_errors_received += 1
except redis.ConnectionError:
# When Redis terminates the listen call will throw a ConnectionError,
Expand Down
12 changes: 12 additions & 0 deletions test/failure_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ def testFailedTask(self):
# ray.get should throw an exception.
self.assertTrue(False)

@ray.remote
def f():
raise Exception("This function failed.")

try:
ray.get(f.remote())
except Exception as e:
self.assertIn("This function failed.", str(e))
else:
# ray.get should throw an exception.
self.assertTrue(False)

def testFailImportingRemoteFunction(self):
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)

Expand Down