Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ray starts too many workers (and may crash) when using nested remote functions. #3644

Closed
robertnishihara opened this issue Dec 27, 2018 · 19 comments · Fixed by #5851
Closed
Labels
bug Something that is supposed to be working; but isn't

Comments

@robertnishihara
Copy link
Collaborator

robertnishihara commented Dec 27, 2018

This is very similar to the earlier issue #231. One proposed solution was implemented by @stephanie-wang in #425.

Users sometimes encounter variants of the following bug and have no idea what is going wrong.

Running the following workload requires about 500 workers to be started (to execute all of the g tasks which are blocked in the call to ray.get before the f tasks start getting executed.

import ray
ray.init()

@ray.remote
def f():
    return 1

@ray.remote
def g():
    return sum(ray.get([f.remote() for _ in range(10)]))

ray.get([g.remote() for _ in range(500)])

Workarounds:

  • Start fewer g tasks
  • Divide g into two parts, e.g.,
    @ray.remote
    def g_part_a():
        return [f.remote() for _ in range(10)]
    
    @ray.remote
    def g_part_b(*results):
        return sum(results)
    
    intermediate = ray.get([g_part_a.remote() for _ in range(500)])
    ray.get([g_part_b.remote(*ids) for ids in intermediate])
  • Use custom resources to constrain the number of g tasks running concurrently (suggested by @ericl).

Potential Solutions:

In the meantime, we can easily detect that we've started way too many workers and push a warning to the user with a link to some possible workaround.

cc @stephanie-wang @ericl

@robertnishihara robertnishihara added the bug Something that is supposed to be working; but isn't label Dec 27, 2018
@PMende
Copy link

PMende commented Feb 19, 2019

I'm looking into using Ray to simplify the parallelization of the training/predicting of a multilevel model in Python. I've encountered this warning in what seems like a fairly simple example as shown below:

import ray


ray.init(num_cpus=6)


@ray.remote
class Foo:
    def __init__(self, x):
        self.x = x

    def calc(self, y):
        return self.x + y

    def adjust_x(self, new_x):
        self.x = new_x

class MultiFoo:
    def __init__(self, num):
        self.foos = [Foo.remote(i) for i in range(num)]

    def calc(self, y):
        return ray.get([foo.calc.remote(y) for foo in self.foos])

    def adjust_foos(self, new_xs):
        results = [
          foo.adjust_x.remote(new_x)
          for foo, new_x in zip(self.foos, new_xs)
        ]
        ray.get(results)

Now if I instantiate some examples of MultiFoo, I can get the warning as mention above:

In [4]: multifoos = MultiFoo(6)

In [5]: multifoos = MultiFoo(5)

In [6]: multifoos = MultiFoo(8)

2019-02-18 16:49:57,841	ERROR worker.py:1632 -- WARNING: 18 workers have been started. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.

However, I can sometimes instantiate many (e.g., 10) such MultiFoo(18) objects very quickly, and bind them to the same variable, and have no such warning pop up.

As this is the issue linked in the warning, I'm posting my question here. Am I doing something incorrectly? Is this something I can safely ignore?

@robertnishihara
Copy link
Collaborator Author

@PMende, the reason you might not be seeing the error when you create many MultiFoo objects and bind them to the same variable is that the previous MultiFoo objects are going out of scope and destructing the Foo actors that they created (the destructed actor processes are killed and so there are never too many workers started at one time).

When all of the MultiFoo objects are still in scope, then all of the actors must be kept alive simultaneously. Each actor is a separate process, and there are limits to how many processes you can start simultaneously. For example, if you check ulimit -n`, that is one upper bound. For this reason, you can't start an arbitrarily large number of actors on a single machine. So if you're script is running fine, then yes you can ignore this warning, but if things crash, it will probably be because too many processes were started simultaneously.

@virtualluke
Copy link
Contributor

virtualluke commented Feb 20, 2019

Is the real downside to launching too many workers the ulimit -n limit? In the example at the beginning of this issue, what is the impact in those 500 calls that aren't doing anything except blocking.

And maybe I have been using terminology incorrectly, when you start ray on a node with some number of cpus (specified in ray start for that node or in init()) you get that many workers on that node, the remote calls to f and then g would be tasks assigned to workers across the ray cluster (assigned amongst (number of ray nodes) * (number cpus per node) number of workers)? Each worker accepts tasks until the local scheduler throttles assignments on the node (is that right?) Even with a small number of workers wouldn't 500 such blocking tasks be of little impact in the load that the local scheduler is using to potentially throttle the scheduling? Please correct me here - I want to really get this straight.

I am making use of custom resources in some similar cases but it gets somewhat cumbersome for custom resource constraints if I want different constraints for 3-4 different remote function calls for example (all constraints which are somewhat artificial throttles I am applying based on some non cpu bound problem that I am using to hard code some sparse scheduling across the cluster). It is similar to the above in that instead of blocking explicitly in the example above I may be effectively blocking because of some large asynchronous or io bound call compared to some cpu bound task.

@minimaxir
Copy link

minimaxir commented May 23, 2019

I am hitting this error just by overloading a single CPU thread w/ asyncio (while hacking the example), without nesting remote calls.

import asyncio
import time
import ray
from ray.experimental import async_api

@ray.remote(num_cpus=0.25)
def f():
    print('async')
    time.sleep(3)
    print('async done')
    return 1

ray.init(object_store_memory=100 * 1000000,
redis_max_memory=100 * 1000000, num_cpus=1)
loop = asyncio.get_event_loop()
tasks = [async_api.as_future(f.remote())
         for i in range(4)]
results = loop.run_until_complete(
    asyncio.gather(*tasks))
print(results)

@pengzhenghao
Copy link
Contributor

To the best of my knowledge, their is no such argument that can pass into ray.remote() to control the total number of worker, just like what we do with: tune.run(..., config={'num_worker': ..}

Am I right?

@PovelikinRostislav
Copy link

Hi all!

I've updated Ray to the latest version (0.7.7) with #5851 MR to fix this issue, but the problem still here:

2019-12-17 00:04:00,398 WARNING worker.py:1054 -- WARNING: 28 PYTHON workers have been started. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.

My case is a chain of calls, not a nested task submission.

Think of constructing a repeating Map-Reduce pattern and calling get() on a last one reduce.
Is it legal usage? :)
Or I should prefer calling get() on each reduce/something else?

@robertnishihara
Copy link
Collaborator Author

@PovelikinRostislav, can you share a minimal example that reproduces the issue?

Also, does the application succeed despite printing the warning? Or does it crash?

@PovelikinRostislav
Copy link

@robertnishihara, I'm in a progress of minimizing of the reproducer.
And I'm sorry for misleading about nested tasks/chain. As I understood from you documentation, call chain literally is a sequence of nested tasks.

There are only warnings, but the amount of created workers is sufficient (5-6 workers instead of 1), because I create exactly 1 worker for one physical node to avoid over-subscription using threaded C++ libraries.

@PovelikinRostislav
Copy link

By the way, can I set some hard limit to prohibit creation of such workers?

@PovelikinRostislav
Copy link

I still experiencing this issue with workers, even with new versions of Ray, but now - without warning messages.

But I have workarounded this problem by changing the semantics of my functions. Instead of circular dependencies, when I call ray.get(objs) inside remote function, I put all required results as *args and it works for now.

@nmayhall-vt
Copy link

I'm hitting this bug it seems, but I don't quite understand the workarounds. My case seems like a simple use case for ray - I need to do many distinct and cpu heavy computations reading from the same data objects. So I put the large objects in the object store via ray.put(). Then my remote function reads this data and does one of the computations. The actual code is quite lengthy, but the main flow looks like:

data1 = DataObject()
data2 = DataObject()
data1_id = ray.put(data1)
data2_id = ray.put(data2)

@ray.remote
def process_data(inp):
    data1_id = inp[0]
    data2_id = inp[1]
    subset_info = inp[2]

    data1 = ray.get(data1_id)
    data2 = ray.get(data2_id)
   
    #compute on data1 and data2 to get result number
    return result

# fill list of work to do in list called jobs
result_ids = [process_data.remote(i) for i in jobs]
out = ray.get(results_ids)
for result in out:
    print(result)

When I do this I get the error WARNING worker.py:1072 -- WARNING: 36 PYTHON workers have been started. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.

However, if I don't pass data1_id and data2_id1 in as arguments to the function process_data, and I just define them above the function so they are in scope, then the execution works as expected and I don't get the warning message, and I the number of workers never exceeds the number of cores on my computer.
Is this the expected behavior?

@robertnishihara
Copy link
Collaborator Author

@nmayhall-vt there is something a little subtle happening here.

If you are doing something like

x_id = ray.put(x)
y_id = ray.put(y)

@ray.remote
def process(data):
    x = ray.get(data[0])
    y = ray.get(data[1])

    # Use x and y

process.remote([x_id, y_id])

Then you should be able to avoid the issue by doing

x_id = ray.put(x)
y_id = ray.put(y)

@ray.remote
def process(x, y):
    # Use x and y

process.remote(x_id, y_id)

The reason is that when you call ray.get inside of a remote function, Ray will treat the task as "not using any resources" until ray.get returns, and so will potentially schedule additional tasks, which may require additional workers to be created.do this,

Does that make sense?

Also, is it giving the warning and then crashing? Or is it giving the warning and then succeeding?

@nmayhall-vt
Copy link

@robertnishihara Thanks so much! I think it after giving the warning it would have succeeded, but I can't say for sure because the issue only showed up for a large enough problem that was too big for my computer to handle.

@alsuhr-c
Copy link

Is this warning actually a problem if I intend to create a bunch of workers? I'm using ray to parallelize writing something to disk frequently, and don't care about the return value. The calls seem to be succeeding, but the warning message keeps appearing so I'm worried that there is something I'm missing (e.g., maybe some of the processes aren't actually succeeding and I'm just missing them).

Thanks!

@robertnishihara
Copy link
Collaborator Author

@alsuhr-c if it's running successfully, then there's probably no issue, but if you are creating many more worker processes than the number of CPU cores you it's possible you'll run into issues like running out of memory or file descriptors or some other resource.

@RafalSkolasinski
Copy link

It is still unclear for me how to control number of nested tasks. If I have a simple code like

import time
import ray

@ray.remote
def a(x):
    return x + ["a"]

@ray.remote
def b(x):
    time.sleep(.1)
    return x + ["b"]

@ray.remote
def c(x):
    return x + ["c"]

and would like to compute a -> b -> c for some list of inputs I'd assume that this is right approach

@ray.remote
def compute(x):
    x = a.remote(x)
    x = b.remote(x)
    x = c.remote(x)
    return ray.get(x)

ray.init()

futures = []
for n in range(500):
    future = compute.remote([n])
    futures.append(future)

results = ray.get(futures)  
ray.shutdown()

but this lead to

2020-08-25 11:21:17,005	WARNING worker.py:1134 -- WARNING: 29 PYTHON workers have been started. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.

that brought me here.

Straight forward solution seems to be removing call to ray.get from remote functions

def compute(x):
    x = a.remote(x)
    x = b.remote(x)
    return c.remote(x)


ray.init()

futures = []
for n in range(500):
    future = compute([n])
    futures.append(future)
    
results = ray.get(futures)    
ray.shutdown()

but that may not be the ideal for every use case - sometimes you may need output of remote tasks inside remote function.

If using custom resources is the solution as discussed above (that seems less impacting the code than changing data flow) maybe would be good to add here a short snippet how that could solve the issue?

@Lewisracing
Copy link

Hi is this issue improved in the latest ray verisions (e.g. 1.1.0)?

@Jhonathan-Pedroso
Copy link

Jhonathan-Pedroso commented Mar 6, 2021

This is very similar to the earlier issue #231. One proposed solution was implemented by @stephanie-wang in #425.

Users sometimes encounter variants of the following bug and have no idea what is going wrong.

Running the following workload requires about 500 workers to be started (to execute all of the g tasks which are blocked in the call to ray.get before the f tasks start getting executed.

import ray
ray.init()

@ray.remote
def f():
    return 1

@ray.remote
def g():
    return sum(ray.get([f.remote() for _ in range(10)]))

ray.get([g.remote() for _ in range(500)])

Workarounds:

  • Start fewer g tasks
  • Divide g into two parts, e.g.,
    @ray.remote
    def g_part_a():
        return [f.remote() for _ in range(10)]
    
    @ray.remote
    def g_part_b(*results):

> return sum(results)

intermediate = ray.get([g_part_a.remote() for _ in range(500)])
ray.get([g_part_b.remote(*ids) for ids in intermediate])

* Use custom resources to constrain the number of `g` tasks running concurrently (suggested by @ericl).

**Potential Solutions:**

* Make the scheduler prioritize the `f` tasks over the `g` tasks (e.g., the strategy in #425 or some sort of LIFO policy.

In the meantime, we can easily detect that we've started way too many workers and push a warning to the user with a link to some possible workaround.

cc @stephanie-wang @ericl

It can be naive but can help other users. I could solve these problem that printed for my case by this logic (Trainer is my actor)- I think can easily be reduced by a decorator:

ray_job_handle

@nirn1973
Copy link

I am getting the same error with Modin[ray]. I do not call a remote function using ray explicitly, but I do initialize and configure ray inside the main/top module, because the program needs extra out-of-core, disk space. Any advice on how to solve this?

ray.init(_plasma_directory=plasma_directory,
include_dashboard=False,
object_store_memory=20e9,
_system_config={
"automatic_object_spilling_enabled": True,
"max_io_workers": 4, # More IO workers for remote storage.
"min_spilling_size": 50 * 1024 * 1024, # Spill at least 100MB at a time.
"object_spilling_config": json.dumps(
{"type": "filesystem", "params": {"directory_path": object_spilling_path}}, #"c"}},
),
"kill_idle_workers_interval_ms": 5000,
"enable_worker_prestart": False,
},
) #object_store_memory=(2*(2**30)))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't
Projects
None yet
Development

Successfully merging a pull request may close this issue.