Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking ray.get/wait inside async context will warn instead of error #7262

Merged
merged 5 commits into from
Mar 15, 2020
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,10 @@ def show_in_webui(message, key="", dtype="text"):
worker.core_worker.set_webui_display(key.encode(), message_encoded)


# Global varaible to make sure we only send out the warning once
blocking_get_inside_async_warned = False


def get(object_ids, timeout=None):
"""Get a remote object or a list of remote objects from the object store.

Expand All @@ -1454,7 +1458,7 @@ def get(object_ids, timeout=None):
object has been created). If object_ids is a list, then the objects
corresponding to each object in the list will be returned.

This method will error will error if it's running inside async context,
This method will issue a warning if it's running inside async context,
you can use ``await object_id`` instead of ``ray.get(object_id)``. For
a list of object ids, you can use ``await asyncio.gather(*object_ids)``.

Expand All @@ -1479,9 +1483,12 @@ def get(object_ids, timeout=None):
if hasattr(
worker,
"core_worker") and worker.core_worker.current_actor_is_asyncio():
raise RayError("Using blocking ray.get inside async actor. "
"This blocks the event loop. Please "
"use `await` on object id with asyncio.gather.")
global blocking_get_inside_async_warned
if not blocking_get_inside_async_warned:
logger.warning("Using blocking ray.get inside async actor. "
"This blocks the event loop. Please "
"use `await` on object id with asyncio.gather.")
simon-mo marked this conversation as resolved.
Show resolved Hide resolved
blocking_get_inside_async_warned = True

with profiling.profile("ray.get"):
is_individual_id = isinstance(object_ids, ray.ObjectID)
Expand Down Expand Up @@ -1547,6 +1554,10 @@ def put(value, weakref=False):
return object_id


# Global varaible to make sure we only send out the warning once
simon-mo marked this conversation as resolved.
Show resolved Hide resolved
blocking_wait_inside_async_warned = False


def wait(object_ids, num_returns=1, timeout=None):
"""Return a list of IDs that are ready and a list of IDs that are not.

Expand All @@ -1565,8 +1576,9 @@ def wait(object_ids, num_returns=1, timeout=None):
precede B in the ready list. This also holds true if A and B are both in
the remaining list.

This method will error if it's running inside an async context. Instead of
``ray.wait(object_ids)``, you can use ``await asyncio.wait(object_ids)``.
This method will issue a warning if it's running inside an async context.
Instead of ``ray.wait(object_ids)``, you can use
``await asyncio.wait(object_ids)``.

Args:
object_ids (List[ObjectID]): List of object IDs for objects that may or
Expand All @@ -1584,9 +1596,12 @@ def wait(object_ids, num_returns=1, timeout=None):
if hasattr(worker,
"core_worker") and worker.core_worker.current_actor_is_asyncio(
) and timeout != 0:
raise RayError("Using blocking ray.wait inside async method. "
"This blocks the event loop. Please use `await` "
"on object id with asyncio.wait. ")
global blocking_wait_inside_async_warned
if not blocking_wait_inside_async_warned:
logger.warning("Using blocking ray.wait inside async method. "
"This blocks the event loop. Please use `await` "
"on object id with asyncio.wait. ")
blocking_wait_inside_async_warned = True

if isinstance(object_ids, ObjectID):
raise TypeError(
Expand Down