-
Notifications
You must be signed in to change notification settings - Fork 7k
Closed
Description
This might be intended behavior, but naive use of ray.wait() can starve workers from having their results processed. For example, in this simple example where tasks take basically no time and the driver processes them slowly one at a time, some results can get delayed for hundreds of wait steps.
import ray
import sys
import time
@ray.remote
def f(i):
return i
ray.init(num_cpus=4)
i = 0
remaining_ids = [f.remote(i) for _ in range(5)]
for _ in range(1000):
[ready], remaining_ids = ray.wait(remaining_ids, 1)
i_old = ray.get(ready)
print(i - i_old, end=" ")
sys.stdout.flush()
time.sleep(.01) # do some work e.g. apply gradients
i += 1
remaining_ids.append(f.remote(i))
Output:
0 1 2 1 4 4 1 4 1 9 1 1 1 1 1 1 1 1 1 11 1 1 16 1 3 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 21 1 1 1 1 1 1 1 48 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 49 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 74 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 223 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 75 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1
Note that you can work around this by waiting on only the first element of the list.
Metadata
Metadata
Assignees
Labels
No labels