Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Excess memory usage when scheduling tasks in parallel? #20618

Open
2 tasks done
chunweiyuan opened this issue Nov 21, 2021 · 14 comments
Open
2 tasks done

[Bug] Excess memory usage when scheduling tasks in parallel? #20618

chunweiyuan opened this issue Nov 21, 2021 · 14 comments
Labels
core Issues that should be addressed in Ray Core core-worker enhancement Request for new feature and/or capability P2 Important issue, but not time-critical question Just a question :)
Milestone

Comments

@chunweiyuan
Copy link

chunweiyuan commented Nov 21, 2021

Search before asking

  • I searched the issues and found no similar issues.

Ray Component

Ray Core

What happened + What you expected to happen

Running this snippet and monitoring via top, one would see the RES of the workers incrementing with each loop iteration (the worker that runs concat), and then something like this happens:

loop 0
(return_np_array pid=1418250) 
loop 1
loop 2
loop 3
loop 4
loop 5
loop 6
(return_np_array pid=1441310) 
loop 7
loop 8
loop 9
loop 10
loop 11
WARNING worker.py:1228 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 1d0538c7d9f14c66b79926c2d5000a99f43197e5506abbd9 Worker ID: 71104f36ba35b1526dc73547fe5085af4c7872bff1291a0c9621c848 Node ID: eba1deefb36030157a58f448a42dedb48a1bacea22b607f15d301d91 

until the eventual crash.

On the other hand, if one comments out the line assigning ar (and of course the del ar), then the RES of each worker stays below 1G.

I've looked at a couple of other issues related to remote task memory leak form numpy but they don't seem to pertain to this problem. And because np.concatenate (and xarray.concat by extension) is rather irreplaceable to me, there's currently no good way for me to get around this problem.

Versions / Dependencies

OS: Linux 3.10.0-1062.12.1.el7.x86_64
Python: 3.7.9 or 3.7.10
Ray: 1.41 or 1.8.0
System resources: 4 threads, 60G memory (part of a node as provided via Sun Grid Engine)
Libraries used: numpy 1.21.1 or 1.19.5

Reproduction script

import numpy as np
import ray
import time

ray.init(num_cpus=4, object_store_memory=50 * 1e9)  # not sure how these specs matter, but they're what I used.

@ray.remote
def return_np_array():  # just return a 1-by-million np array
    return np.arange(1000000).reshape(1, 1000000)

@ray.remote
def concat(array_futures):
    return np.concatenate(ray.get(array_futures), axis=0)

for i in range(20):
    list_of_arrays = [return_np_array.remote() for _ in range(1000)]
    ar = concat.remote(list_of_arrays)
    del list_of_arrays
    del ar
    print(f"loop {i}")
    time.sleep(10)

Anything else

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!
@chunweiyuan chunweiyuan added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Nov 21, 2021
@chunweiyuan
Copy link
Author

chunweiyuan commented Nov 23, 2021

Update:

If I define

1.) success = finishing the i-loop without worker failure
2.) n = size of list_of_arrays
3.) st = sleep time in seconds

then

4 cores, 60G mem:
n=1000, st=10 --> failure
n=1000, st=100 --> success
n=900, st=10 --> success
n=900, st=5 --> failure
n=800, st=5 --> success

4 cores, 240G mem:
n=1000, st=10 --> success

Am I right in inferring that this could be either 1.) a memory leak issue, or 2.) a garbage collection efficiency issue?

@chunweiyuan chunweiyuan changed the title [Bug] Remote task memory leak from numpy.concatenate [Bug] Remote task memory leak Nov 24, 2021
@chunweiyuan
Copy link
Author

chunweiyuan commented Nov 24, 2021

My issue seems similar to #7653, but in my case I'm able to relieve the leak via longer sleep time, which feels garbage collection-related.

@ericl
Copy link
Contributor

ericl commented Dec 1, 2021

If you add system.gc() in the loop / remote functions, does it prevent the leak?

@ericl ericl self-assigned this Dec 1, 2021
@ericl ericl added this to the Core Backlog milestone Dec 1, 2021
@chunweiyuan
Copy link
Author

chunweiyuan commented Dec 2, 2021

Hi @ericl ,

Here's the new code I tried (just added a few del and gc.collect()):

import gc
import numpy as np
import ray
import time

ray.init(num_cpus=4, object_store_memory=50 * 1e9)  # don't think these matter

@ray.remote
def return_np_array():
    return np.arange(1000000).reshape(1, 1000000)

@ray.remote
def concat(array_futures):
    arrays = ray.get(array_futures)
    for ref in array_futures:
        del ref
    del array_futures
    gc.collect()
    out = np.concatenate(arrays, axis=0)
    for ref in arrays:
        del ref
    del arrays
    gc.collect()
    return out

for i in range(20):
    list_of_arrays = [return_np_array.remote() for _ in range(1000)]
    ar = concat.remote(list_of_arrays)
    del list_of_arrays
    del ar
    gc.collect()
    print(f"loop {i}")
    time.sleep(10)

And here's the output:

loop 0
loop 1
(return_np_array pid=449246) 
loop 2
loop 3
loop 4
loop 5
(return_np_array pid=472376) 
loop 6
loop 7
loop 8
loop 9
loop 10
(return_np_array pid=501903) 
loop 11
2021-12-01 16:32:36,959	WARNING worker.py:1228 -- The autoscaler failed with the following error:
Traceback (most recent call last):
  File "/ihme/code/forecasting/env/rf37/lib/python3.7/site-packages/ray/autoscaler/_private/monitor.py", line 423, in run
    self._run()
  File "/ihme/code/forecasting/env/rf37/lib/python3.7/site-packages/ray/autoscaler/_private/monitor.py", line 311, in _run
    self.update_load_metrics()
  File "/ihme/code/forecasting/env/rf37/lib/python3.7/site-packages/ray/autoscaler/_private/monitor.py", line 232, in update_load_metrics
    request, timeout=60)
  File "/ihme/code/forecasting/env/rf37/lib/python3.7/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/ihme/code/forecasting/env/rf37/lib/python3.7/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "{"created":"@1638405156.889125083","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4}"
>

loop 12
(raylet) [2021-12-01 16:32:39,645 E 504876 504876] core_worker.cc:425: Failed to register worker ceab59386659397300b7bc87ecec3efdc7f977861f0b7a4166cac5a8 to Raylet. Invalid: Invalid: Unknown worker
2021-12-01 16:32:40,850	WARNING worker.py:1228 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ccf54d136fed1ef63b9e647455b92903efbedda4fecf2b13 Worker ID: 539eaa03d0417d410e27014a2b2a53cf3cd8f3563ff00377ef87805f Node ID: 2b29de614e12fff55e891bd6f78e5bdd6fefd84439952d2a850cbddd Worker IP address: 10.158.99.140 Worker port: 33707 Worker PID: 448497
2021-12-01 16:32:40,850	WARNING worker.py:1228 -- 3 retries left for task 44af561262ce1bf1ffffffffffffffffffffffff01000000, attempting to resubmit.
loop 13
loop 14
loop 15

Attached are some screen shots from top:
loop 3
loop 3

loop 5
loop 5

loop 7
loop 7

loop 10
loop 10

loop 11
loop 11

Thank you very much for looking into this!

@ericl
Copy link
Contributor

ericl commented Dec 2, 2021

Hmm, I'll take another look. Another thing to try is running "ray memory" to see what reference are still in scope. That should tell you if it's a reference counting issue or not, and also the type of reference causing the leak.

@ericl ericl added P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 2, 2021
@chunweiyuan
Copy link
Author

chunweiyuan commented Dec 3, 2021

Hmm, I'll take another look. Another thing to try is running "ray memory" to see what reference are still in scope. That should tell you if it's a reference counting issue or not, and also the type of reference causing the leak.

Hi Eric,

Unfortunately the terminal command ray memory doesn't work on either my laptop nor our cluster. While the dashboards work, for some reason the memory tab updates very slowly to be of any use for me. So I wasn't able to drill down to the references.

Barring that, I did some empirical grid search on this issue. I suspect the np.concatenate() is just a red herring, the real issue being just slow garbage collection.

To start, imagine a smaller-scale version of the above snippet, run on

OS: Linux 3.10.0-1062.12.1.el7.x86_64
Python: 3.7.10
Ray: 1.8.0
System resources: 4 threads, 8G memory (part of a node as provided via Sun Grid Engine)
Libraries used: numpy 1.21.1

import gc
import numpy as np
import ray
import time

ray.init(dashboard_host="0.0.0.0", num_cpus=4, object_store_memory=4 * 1e9)  

@ray.remote
def return_np_array():
    return np.arange(1000000).reshape(1, 1000000)  # ~8MB

@ray.remote
def concat(array_futures):
    arrays = ray.get(array_futures)
    for ref in array_futures:
        del ref
    del array_futures
    gc.collect()
    out = np.concatenate(arrays, axis=0)
    for ref in arrays:
        del ref
    del arrays
    gc.collect()
    return out

for i in range(100):
    list_of_arrays = [return_np_array.remote() for _ in range(100)]  # ~0.8GB
    ar = concat.remote(list_of_arrays)
    del list_of_arrays
    del ar
    gc.collect()
    print(f"loop {i}")
    time.sleep(1)

sleep time = 1s (spill/restore, with eventual WARNING worker.py:1228 about task ID cannot be scheduled):

Cluster_concat_100_arrays_1s_sleep

sleep time = 10s (peaceful saturation):

Cluster_concat_100_arrays_10s_sleep

So longer wait time seems to allow worker memory to settle on 1.6G (because np.concatenate doubles memory footprint from 0.8 to 1.6G?) without erroring out.

Now, what if, instead of doubling memory footprint via np.concatenate, I simply create a list_of_arrays twice as big, and ray.get() it without concat?

import gc
import numpy as np
import ray
import time

ray.init(dashboard_host="0.0.0.0", num_cpus=4, object_store_memory=4 * 1e9) 

@ray.remote
def return_np_array():
    return np.arange(1000000).reshape(1, 1000000)  # ~8MB

@ray.remote
def get_arrays(array_futures):
    """ray.get() and return the arrays"""
    arrays = ray.get(array_futures)
    for ref in array_futures:
        del ref
    del array_futures
    gc.collect()
    return arrays

for i in range(100):
    list_of_arrays = [return_np_array.remote() for _ in range(200)]  # ~1.6G
    ars = get_arrays.remote(list_of_arrays)
    del list_of_arrays
    del ars
    gc.collect()
    print(f"loop {i}")
    time.sleep(1)

sleep time = 1s (the same spill/restore chaos):

Cluster_copy_200_arrays_1s_sleep

sleep time = 10s (peaceful saturation):

Cluster_copy_200_arrays_10s_sleep
(Curiously enough, the memory footprint on the workers is twice what I expect now.)

I've also tried the two above snippets on my laptop (Mac OS 11.6, quad-core intel, 8GB) and got more or less the same behavior.

Does Ray have its own garbage collector I could play with?

@ericl
Copy link
Contributor

ericl commented Dec 3, 2021

The following seems to work for me (it ran to completion). I think the issue was that the loop wasn't blocking on the remote task to finish (no ray.get), hence the number of active tasks / memory usage grows without bound.

import numpy as np
import ray
import time

ray.init(dashboard_host="0.0.0.0", num_cpus=4, object_store_memory=4 * 1e9) 

@ray.remote
def return_np_array():
    return np.arange(1000000).reshape(1, 1000000)  # ~8MB

@ray.remote
def get_arrays(array_futures):
    arrays = ray.get(array_futures)
    return 0

for i in range(100):
    list_of_arrays = [return_np_array.remote() for _ in range(200)]  # ~1.6G
    ars = get_arrays.remote(list_of_arrays)
    ray.get(ars)
    print(f"loop {i}")

@chunweiyuan
Copy link
Author

chunweiyuan commented Dec 3, 2021

Hi @ericl,

Thanks for the quick reply. Yes your code ran to completion for me too. And it's interesting that the worker memory footprint on the dashboard is only about 1.8GB now, compared to ~3.2GB when I ray.get() in the remote. I thought whatever the remote gets will just get zero-copy-mapped to the object store, so the total memory footprint should stay ~1.6GB. Or perhaps there's some double-counting on the dashboard I'm looking at?

Also, does your example suggest the rule-of-thumb that I should always ray.get() on the driver, concat, and then move on to the next list of arrays? That seems to be a rather limiting constraint on Ray usage, and it wouldn't be too different from a completely serial implementation.

@ericl
Copy link
Contributor

ericl commented Dec 4, 2021

Yeah it is pretty easy to shoot yourself in the foot when using the tasks API directly.

It's a question of optimal scheduling order--- in this case, you minimize footprint by scheduling serially (or maybe you can allow a limited number of tasks to execute). You can run all of them in parallel but that seems to cause excessive thrashing in this hardware configuration. (cc @stephanie-wang for further thoughts on memory management here).

@ericl ericl removed their assignment Dec 4, 2021
@ericl ericl added question Just a question :) and removed bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks labels Dec 4, 2021
@ericl ericl changed the title [Bug] Remote task memory leak [Bug] Excess memory usage when scheduling tasks in parallel? Dec 4, 2021
@ericl ericl added the enhancement Request for new feature and/or capability label Dec 4, 2021
@stephanie-wang
Copy link
Contributor

Yeah, Eric's approach of inserting ray.get() or ray.wait() statements to throttle how many tasks are in the system is the recommended approach for now.

It might also help if you use varargs to pass array_futures, like ars = get_arrays.remote(*list_of_arrays), so that get_arrays doesn't actually get scheduled until all of its inputs are available. If that allows the script to work, there might be a bug with how we're handling ray.get requests inside get_arrays.

Also, when you say it's failing, do you mean that the driver gets OOM-killed?

@chunweiyuan
Copy link
Author

Hi @stephanie-wang and @ericl ,

Thank you very much for your assistance. As a recap, this is more or less the code I tested, given a 4 cores/60GB allotment on a 40 cores/251.7GB node:

import gc
import numpy as np
import ray
import time

ray.init(num_cpus=4, object_store_memory=50 * 1e9)

@ray.remote
def return_np_array():  # just return a 1-by-million np array
    return np.arange(1000000).reshape(1, 1000000)  # 8MB

@ray.remote
def concat(array_futures):
    return np.concatenate(ray.get(array_futures), axis=0)

for i in range(100):
    list_of_arrays = [return_np_array.remote() for _ in range(1000)]  # 8GB, takes ~5.5s to finish
    ar = concat.remote(list_of_arrays). # doubles mem to 16GB, takes ~11s
    del list_of_arrays
    del ar
    gc.collect()
    print(f"loop {i}")
    time.sleep(10)

If the sleep time is <= 11s, the above code would often "fail" like this:

WARNING worker.py:1228 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: ... Worker ID: ... Node ID: ... Worker IP address: ... Worker port: ... Worker PID: ...

some times followed by

OSError: [Errno 12] Cannot allocate memory

, with a dashboard shot like this
Screen Shot 2021-12-08 at 12 44 05 PM

I think my struggles stem from not clearly understanding how scheduling works, leading to a lot of confusion about what the dashboard says:

1.) The screenshot suggests I have 4 concat()s running simultaneously. Does that mean the workers blew through the creation of the first four list_of_arrays and all ended up busy concatenating? Or does concat() on the screenshot capture the array creation process as well, since that's where I call ray.get()?

2.) If I ray.init() with num_cpus=4, why would there be more than 4 PIDs running at the same time, spill or not?

3.) If I ray.init() with num_cpus=2, I see the WARNING worker.py:1228 message even earlier in the loop iterations.

4.) Which numbers should I sum up to determine my total memory footprint? Do I sum up the nodal RAM + Plasma (25.4 + 37.3 ~ 62.7GB), or do I sum up all the non-idle processes below the nodal RAM count (>~74GB)?

Thank you very much again for attending to this issue. It took me some time to put my thoughts together on this.

Chun

@stephanie-wang
Copy link
Contributor

1.) The screenshot suggests I have 4 concat()s running simultaneously. Does that mean the workers blew through the creation of the first four list_of_arrays and all ended up busy concatenating? Or does concat() on the screenshot capture the array creation process as well, since that's where I call ray.get()?

Most likely what is happening is that the concat() tasks are getting scheduled, but then they block in ray.get(array_futures) because those ObjectRefs aren't actually ready yet. That's also why we suggested trying with concat.remote(*array_futures): a task with ObjectRefs as direct arguments won't get scheduled until those ObjectRefs are ready, but a task with ObjectRefs in a list as the argument will get scheduled immediately.

2.) If I ray.init() with num_cpus=4, why would there be more than 4 PIDs running at the same time, spill or not?

This can happen because a task can get stuck in ray.get, so Ray starts more worker processes than the available CPUs to avoid deadlock. num_cpus will limit how many tasks are running at the same time, but this can be violated if your tasks have ray.get in them.

3.) If I ray.init() with num_cpus=2, I see the WARNING worker.py:1228 message even earlier in the loop iterations.

Not sure about this one, but the warning is probably because workers are getting OOM-killed.

4.) Which numbers should I sum up to determine my total memory footprint? Do I sum up the nodal RAM + Plasma (25.4 + 37.3 ~ 62.7GB), or do I sum up all the non-idle processes below the nodal RAM count (>~74GB)?

Plasma memory should be a subset of the total RAM per node.

Thank you very much again for attending to this issue. It took me some time to put my thoughts together on this.

Chun

No problem! These concepts are definitely not intuitive...

@chunweiyuan
Copy link
Author

Hi @stephanie-wang ,

I just tried what you suggested regarding changing the list arg into direct args, with the following changes to the original code:

@ray.remote
def concat(*array_futures):
    return np.concatenate(array_futures, axis=0)


for i in range(100):
    ...
    ar = concat.remote(*list_of_arrays)
    ...

For me, there doesn't seem to be any practical difference between the two methods. If I provide enough memory for one to work, the other one would work as well. Same thing with failure. I haven't been able to find a situation where one method succeeds and the other fails.

However, I am quite curious about their different behaviors on the dashboard. Here's the direct args screen shot:
tot_60g_obj_50_cpu_4_pos_args_success

versus the list-arg case:

tot_60g_obj_50_cpu_4_list_arg_success

As you kindly explained, passing the list ref to the remote tricks the worker into thinking that all elements are readily available and hence begins scheduling, only to block on the ray.get(). Once blocked, Ray starts working on the next tasks, only to keep getting blocked, so as to accumulate big backlog that it has to chew through.

That leads to a few questions:

1.) What would be the advantage in the list-arg case? I would think that puts extra burden on the scheduler, and potentially confuses those users who are scheduler noobs (like yours truly).

2.) You mentioned that the number of worker processes can certainly go above num_cpus to avoid deadlock. But are these actual new threads? I'm confused when I see 98 workers scheduled (with finite cpu percentages shown in each of them) on an 80-core machine, and yet I'm not punished for abusing my cpu quota (4), neither do I see discernible increase in the load average when I run top.

3.) Are spill workers additional threads? I'd guess that spilling slows a task down, which could lead to a bottleneck in the backlog?

Once again, thank you so much for attending to this.

@scv119
Copy link
Contributor

scv119 commented Sep 1, 2022

@stephanie-wang do you think we should do it in 2.1?

@rkooo567 rkooo567 added the core Issues that should be addressed in Ray Core label Dec 9, 2022
@jjyao jjyao added the triage Needs triage (eg: priority, bug/not-bug, and owning component) label Feb 14, 2024
@fishbone fishbone added core-worker P0 Issues that should be fixed in short order and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) Ray-2.1 labels Mar 22, 2024
@jjyao jjyao added P1.5 Issues that will be fixed in a couple releases. It will be bumped once all P1s are cleared and removed P0 Issues that should be fixed in short order labels Apr 1, 2024
@jjyao jjyao added P2 Important issue, but not time-critical and removed P1.5 Issues that will be fixed in a couple releases. It will be bumped once all P1s are cleared labels Nov 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Issues that should be addressed in Ray Core core-worker enhancement Request for new feature and/or capability P2 Important issue, but not time-critical question Just a question :)
Projects
None yet
Development

No branches or pull requests

7 participants