Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot call ray.get inside async actor (not a request for async get) #7197

Closed
jsuarez5341 opened this issue Feb 17, 2020 · 8 comments · Fixed by #7262
Closed

Cannot call ray.get inside async actor (not a request for async get) #7197

jsuarez5341 opened this issue Feb 17, 2020 · 8 comments · Fixed by #7262
Assignees
Labels
bug Something that is supposed to be working; but isn't

Comments

@jsuarez5341
Copy link
Contributor

jsuarez5341 commented Feb 17, 2020

Problem: ray.get not allowed inside async actors

Have been discussing this issue with @edoakes on Slack. Apparently the decision was made to hard stop ray.get inside async actors to prevent people from accidentally running blocking code inside otherwise fully async tasks. The problem is that blocking code is actually needed inside lightly async tasks.

Key Example: Ray.signal is quite buggy and sadly hasn't seen much love lately (and I have heard it may be deprecated). One possible replacement is to use the Async api to add queues to synchronous actors that function as async inboxes. In this case, you would want to have a light async workload, such as adding items to a queue (represented by asyncWork in the snippet below). You wouldn't want most of your run() method preempted, but instead would designate a small segment where control can be yielded using asyncio.sleep() (where I have time.sleep() now).

Impact: Point-to-point communication is a key feature in many distributed applications. Without ray.get inside async actors, it's not really possible to replicate the functionality of ray.signal with the async api (at least I couldn't find a solution). Adding an allow_blocking flag or similar to ray.init would solve this issue for now. I'd like to emphasize that this really isn't a good long term solution. Ray.signal is a much, much better API for what it's intended to do. Async code introduces much more complexity than signaling, so having async replace signal is far from ideal unless we can come up with a really easy plug-and-play async queue for use with otherwise synchronous actors.

Ray version and other system information (Python version, TensorFlow version, OS): Latest Ray wheel on Ubuntu (though this should hold on all systems -- seems more of a design choice that has outlived its usefulness than anything)

Reproduction:

import ray                                                                    
import time                                                                   
import asyncio                                                                
                                                                              
@ray.remote                                                                   
def bar():                                                                    
   return                                                                     
                                                                              
@ray.remote                                                                   
class Foo:                                                                    
   def run(self):                                                             
      while True:                                                             
         time.sleep(1)                                                        
         ray.get(bar.remote())                                                
                                                                              
   async def asyncWork(self):                                                 
      pass                                                                    
    
if __name__ == '__main__':                                                    
   ray.init()                                                                 
   foo = Foo.remote()                                                         
   foo.run.remote()                                                           
   while True:                                                                
      time.sleep(1)                                                           
      foo.asyncWork.remote()
  • [ y] I have verified my script runs in a clean environment and reproduces the issue.
  • [ y] I have verified the issue also occurs with the latest wheels.
@jsuarez5341 jsuarez5341 added the bug Something that is supposed to be working; but isn't label Feb 17, 2020
@jsuarez5341
Copy link
Contributor Author

Edit: fixed code format

@simon-mo simon-mo self-assigned this Feb 17, 2020
@simon-mo
Copy link
Contributor

It seems the alternative async version of the run method is

 async def run(self):                                                             
    while True:                                                             
       time.sleep(1)  # simulate work, this won't be preempted
       await bar.remote()

the blocking code itself won't be pre-emptied. asyncio will only context switch when you are waiting for bar.remote() to execute. When the context switch happens, other coroutines will be allowed to run.

@jsuarez5341
Copy link
Contributor Author

That works if you happen to be calling ray.get from an async method, but what if you want to call it from a synchronous method?

@simon-mo
Copy link
Contributor

can you elaborate on why do you need to call it from synchronous method? Wrapping your synchronous method inside an async method have no performance penalty and there won't be any preemption. You have to explicitly yield control with await.

@ericl
Copy link
Contributor

ericl commented Feb 17, 2020

Isn't the bigger issue that run will block the entire event loop unless it's an async def? Even if we allowed get, the example above still wouldn't work. You would need:

import ray
import time
import asyncio

@ray.remote
def bar():
   return

@ray.remote
class Foo:
   async def run(self):
      while True:
         await asyncio.sleep(1)
         await bar.remote()
         print("running")

   async def asyncWork(self):
      print("do async poll")

if __name__ == '__main__':
   ray.init()
   foo = Foo.remote()
   foo.run.remote()
   while True:
      time.sleep(1)
      ray.get(foo.asyncWork.remote())

@simon-mo can we raise a warning if a non-async method is called?

@jsuarez5341
Copy link
Contributor Author

@ericl Yes I missed an async def in my example, thank you.

@simon-mo The larger issue is that having to await ray.get forces you to redefine a potentially large number of functions as asynchronous (see below).

import ray                                                                    
import time                                                                   
import asyncio                                                                

@ray.remote                                                                   
def bar():                                                                    
   return                                                                     
                                                                              
@ray.remote                                                                   
class Foo:                                                                    
   async def run(self):                                                       
      while True:                                                             
         time.sleep(1)                                                        
         self.f1()                                                            
                                                                              
   def f1(self):                                                              
      return self.f2()                                                        
                                                                              
   def f2(self):                                                              
      return self.f3()                                                        
                                                                              
   def f3(self):                                                              
      return ray.get(bar.remote())                                            
                                                                              
if __name__ == '__main__':                                                    
   ray.init()                                                                 
   foo = Foo.remote()                                                         
   foo.run.remote()                                                           
   while True:                                                                
      time.sleep(1)

@simon-mo
Copy link
Contributor

#7262 change the error to warning

@jsuarez5341
Copy link
Contributor Author

Much better, thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants