Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Memory changes are not as expected when using ray.get() #30615

Open
zyheeeeee opened this issue Nov 23, 2022 · 2 comments
Open

[core] Memory changes are not as expected when using ray.get() #30615

zyheeeeee opened this issue Nov 23, 2022 · 2 comments
Assignees
Labels
core Issues that should be addressed in Ray Core P2 Important issue, but not time-critical question Just a question :)

Comments

@zyheeeeee
Copy link

What happened + What you expected to happen

Recently, we found several unexpected phenomena when using the ray.get(). We hope to get a corresponding explanation

1.When using ray.get() to fetch the same data twice from another node, the maximum memory used by the user has doubled twice.

We use Prometheus to record the change curve of the entire process. As shown below.

Memory change

These two curves record the total memory usage of the two nodes respectively.

My question is why points C and D in the figure grow to twice the size of the data, and then drop down to be the same size as the data.

In my opinion, the first ray.get() has fetched the data to plasma from the remote node. The second calling should reuse the same memory rather than fetching again.

@ray.remote(resources={"Resource1": 1})
def task3(params):
    print(f"start task3 {time.time()}")
    time.sleep(20)

@ray.remote(resources={"Resource0": 1})
def send():
    print(f"start create {time.time()}")
    symbol_components = []
    
    params = {
        "param1": nn.Parameter(torch.rand(1024, 409600), requires_grad=False),
        "param2": nn.Parameter(torch.rand(1024, 409600), requires_grad=False),
    }

    print(f"end create {time.time()}")
    time.sleep(20)

    param_ref = ray.put(params)
    print(f"end put {time.time()}")
    time.sleep(20)

    print(f"start call {time.time()}")

    ref = task3.remote(param_ref)
    print(ray.get(ref), time.time())

    print(f"start call2 {time.time()}")
    time.sleep(20)

    ref = task3.remote(param_ref)
    print(ray.get(ref), time.time())

2.We use ray.get() in a function of an actor to fetch data from another node. After this function ends, the data is copied from the remote node to the local plasma memory and has not been released even if we never use it.

We also found that even if the plasma triggers spilling out, the copied data is still kept and not released.

When and how this copy data be released?


@ray.remote(resources = {"Resource0":8})
class Worker1:
    def __init__(self):
        self.value = 0
        
    def put_data(self):
        param = nn.Parameter(torch.rand(1024, 4096),
                            requires_grad=False)
        ref_ray = []
        time_start_put = time.time()
        for i in range(global_nums):
            ref = ray.put(param)
            ref_ray.append(ref)
        ray_put_time = time.time() - time_start_put
        return ref_ray
    
    def get_data(self,ref_ray):
        time_start_get = time.time()
        for i in range(len(ref_ray)):
            ref = ray.get(ref_ray[i])
        ray_get_time = time.time() - time_start_get
        return ray_get_time
    
@ray.remote(resources = {"Resource1":8})
class Worker2:
    def __init__(self):
        self.value = 0

    def put_data(self):
        param = nn.Parameter(torch.rand(1024, 4096),
                            requires_grad=False)  # 4M*4=16MB
        ref_ray = []
        time_start_put = time.time()
        for i in range(global_nums):
            ref = ray.put(param)
            ref_ray.append(ref)
        ray_put_time = time.time() - time_start_put
        return ref_ray
    
    def get_data(self, ref_ray):
        time_start_get = time.time()
        for i in range(len(ref_ray)):
            ray.get(ref_ray[i])
        ray_get_time = time.time() - time_start_get
        return ray_get_time
    
def ray_multinode_put_get_test():
    ray.init(address="ray://******:10001")
    total_start_time = time.time()
    
    worker1 = Worker1.remote()
    worker2 = Worker2.remote()
    
    ref_ray1 = worker1.put_data.remote()
    ref_ray2 = worker2.put_data.remote()
    
    list1 = ray.get(ref_ray1)
    list2 = ray.get(ref_ray2)
    
    print("len:", len(list2))
    num = 20
    for i in range(num):
        get_time1 = worker1.get_data.remote(list2[(int)(len(list2)/num * i):(int)(len(list2)/num * (i+1))])
        ray.get(get_time1)
        print("end to sleep")
        time.sleep(30)
        print("start again")
    
    total_end_time = time.time() - total_start_time
    print("total costs:{}".format(total_end_time))
    ray.shutdown()
    
  
if __name__ == "__main__":
    ray_multinode_put_get_test()

Versions / Dependencies

Ray version 1.13.0

Reproduction script

Test code please see description.

Issue Severity

High: It blocks me from completing my task.

@zyheeeeee zyheeeeee added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Nov 23, 2022
@sven1977 sven1977 added P2 Important issue, but not time-critical and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Nov 23, 2022
@AndyBug0
Copy link

I also found this problem. I am wondering when I keep an objectRef in an actor called A, and send the objectRef to another actor called B (notice that actor A and actor B are on different node) so that it can obtain the object from plasma into process memory using ray.get, what would the plasma do after actor B released the objectRef? Would the plasma release the object? If not, how should I do to release the object in actor B's node plasma, and retain the object in actor A's node plasma?

@rkooo567 rkooo567 added triage Needs triage (eg: priority, bug/not-bug, and owning component) core Issues that should be addressed in Ray Core and removed P2 Important issue, but not time-critical labels Nov 25, 2022
@stephanie-wang stephanie-wang added question Just a question :) and removed bug Something that is supposed to be working; but isn't labels Nov 29, 2022
@stephanie-wang stephanie-wang self-assigned this Nov 29, 2022
@stephanie-wang
Copy link
Contributor

Great question, I think part of the problem here is that torch tensors are not zero-copy deserializable, so every time you call ray.get, it actually doubles the memory usage: one copy stays in shared memory and the other is the deserialized copy in the worker's heap. This issue is tracked here. You could try your script again with a numpy array (which is zero-copy) and see if you have the same issue.

Here is the condition for when object copies can get released from plasma:

  1. No worker is actively using it (via ray.get or task arg). For zero-copy deserializable objects, the worker also has to release any refs to the value returned by ray.get. For non-zero-copy objects, the object can get released as soon as the worker finishes the ray.get.
  2. For the original copy of the object only (i.e. the driver's node in your first script), the copy can be released once all ObjectRefs have been deleted or once it has been spilled.

Note that these are just the conditions for release; the actual release happens when:

  1. There is memory pressure on the node.
  2. All ObjectRefs have gone out of scope.

@hora-anyscale hora-anyscale added P2 Important issue, but not time-critical and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Dec 5, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Issues that should be addressed in Ray Core P2 Important issue, but not time-critical question Just a question :)
Projects
None yet
Development

No branches or pull requests

6 participants