Skip to content

Commit

Permalink
Blocking ray.get/wait inside async context will warn instead of error (
Browse files Browse the repository at this point in the history
  • Loading branch information
simon-mo authored Mar 15, 2020
1 parent 6b37be9 commit 3f1fcaa
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,10 @@ def show_in_webui(message, key="", dtype="text"):
worker.core_worker.set_webui_display(key.encode(), message_encoded)


# Global varaible to make sure we only send out the warning once
blocking_get_inside_async_warned = False


def get(object_ids, timeout=None):
"""Get a remote object or a list of remote objects from the object store.
Expand All @@ -1456,7 +1460,7 @@ def get(object_ids, timeout=None):
object has been created). If object_ids is a list, then the objects
corresponding to each object in the list will be returned.
This method will error will error if it's running inside async context,
This method will issue a warning if it's running inside async context,
you can use ``await object_id`` instead of ``ray.get(object_id)``. For
a list of object ids, you can use ``await asyncio.gather(*object_ids)``.
Expand All @@ -1481,9 +1485,13 @@ def get(object_ids, timeout=None):
if hasattr(
worker,
"core_worker") and worker.core_worker.current_actor_is_asyncio():
raise RayError("Using blocking ray.get inside async actor. "
"This blocks the event loop. Please "
"use `await` on object id with asyncio.gather.")
global blocking_get_inside_async_warned
if not blocking_get_inside_async_warned:
logger.warning("Using blocking ray.get inside async actor. "
"This blocks the event loop. Please use `await` "
"on object id with asyncio.gather if you want to "
"yield execution to the event loop instead.")
blocking_get_inside_async_warned = True

with profiling.profile("ray.get"):
is_individual_id = isinstance(object_ids, ray.ObjectID)
Expand Down Expand Up @@ -1549,6 +1557,10 @@ def put(value, weakref=False):
return object_id


# Global variable to make sure we only send out the warning once.
blocking_wait_inside_async_warned = False


def wait(object_ids, num_returns=1, timeout=None):
"""Return a list of IDs that are ready and a list of IDs that are not.
Expand All @@ -1567,8 +1579,9 @@ def wait(object_ids, num_returns=1, timeout=None):
precede B in the ready list. This also holds true if A and B are both in
the remaining list.
This method will error if it's running inside an async context. Instead of
``ray.wait(object_ids)``, you can use ``await asyncio.wait(object_ids)``.
This method will issue a warning if it's running inside an async context.
Instead of ``ray.wait(object_ids)``, you can use
``await asyncio.wait(object_ids)``.
Args:
object_ids (List[ObjectID]): List of object IDs for objects that may or
Expand All @@ -1586,9 +1599,12 @@ def wait(object_ids, num_returns=1, timeout=None):
if hasattr(worker,
"core_worker") and worker.core_worker.current_actor_is_asyncio(
) and timeout != 0:
raise RayError("Using blocking ray.wait inside async method. "
"This blocks the event loop. Please use `await` "
"on object id with asyncio.wait. ")
global blocking_wait_inside_async_warned
if not blocking_wait_inside_async_warned:
logger.warning("Using blocking ray.wait inside async method. "
"This blocks the event loop. Please use `await` "
"on object id with asyncio.wait. ")
blocking_wait_inside_async_warned = True

if isinstance(object_ids, ObjectID):
raise TypeError(
Expand Down

0 comments on commit 3f1fcaa

Please sign in to comment.