-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support detached/GCS owned objects #12635
Comments
I have a concern about this method. If we have a global actor per group of task, and it's served for this purpose, will it just work? Will it be simpler? Do we need to support move ownership for this? |
I agree binding the lifetime of an object to an actor might be the way to go. The way I see it there are probably a few use cases: Perhaps we could create the following API? Perhaps we don't need to give objects an explicit name, just allow their lifetime to be changed:
Btw, actually doing ownership transfer is pretty hard, since the owner metadata is propagated inline with the object reference. You'd need to add some kind of transfer table to handle that. Assuming the number of objects that require transfer is quite small, it would be easier to instead only allow ownership to be transferred to the GCS. This is simpler since it means owner lookups can always fall back to GCS on owner failure. |
I think my concern here is two things: 1) how to manage all the detached variables; 2) how to do better isolation between groups of jobs. how to manage all the detached variables Here I assume one job to be a how to do better isolation between groups of jobs I thought of introducing job groups of something else, but then it looks like one driver job with several sub-jobs. So sub-jobs(separate py file) can run in the driver job (main py file), and the driver job will have a reference of the shared data, which will work. I think something is missing here in my brain. Please share more details about this. |
Another user ask in #12748 |
This isn't something we need to worry about though--- if the user pins too much, it's their problem. With an autoscaling cluster, we can also spill / add more nodes to handle this case.
One way is that the object refs can be transferred through a named actor. Perhaps we can try this to see if it satisfies use cases before adding another API. |
Btw, in the RayDP/Spark on Ray use case, the variable just needs to be transferred within the job, so there isn't the use case of multiple jobs yet. For caching use cases, the variables can be transferred to named actors global across many jobs. So I think changing lifetime is sufficient for both these cases, but perhaps there are others that require more functionality. |
I think I misunderstood some parts. So let me just summarize it here and please let me know if I was wrong:
Another thing I might want to be clarified here is what's a job? Is it a python script? Some demo code about what needs to be achieved should be helpful for me to understand it.
I'm thinking that it's a cluster and multiple users will want to submit the job in the same cluster. If someone uses too much and we can't have a way to group the related variables, it's going to be a mess. But if it's through a named actor and jobs users have different named actors, then it's OK. We can limit memory usage through these actors to protect the cluster. |
That sounds right to me. A couple clarifications: a Python script/driver
defines a job, since we generate a unique job on connecting to a cluster.
So in the Spark on Ray case, all actors run would be all within a single
job.
Re: memory usage, the isolation here is not very well developed, but note
this is a general problem in Ray and not something specific to detached
objects. Hence, it is out of scope of this feature itself.
…On Thu, Dec 10, 2020, 5:26 PM Yi Cheng ***@***.***> wrote:
I think I misunderstood some parts. So let me just summarize it here and
please let me know if I was wrong:
- Having a named actor shared across jobs to exchange data between them
- If the user wants to do the cross job communication, pass the
ownership to the actor
- Multiple eviction policy can be supported here (detached, lru,
binding it to anther job/actor)
Another thing I might want to be clarified here is what's a job? Is it a
python script? Some demo code about what needs to be achieved should be
helpful for me to understand it.
This isn't something we need to worry about though--- if the user pins too
much, it's their problem. With an autoscaling cluster, we can also spill /
add more nodes to handle this case.
I'm thinking that it's a cluster and multiple users will want to submit
the job in the same cluster. If someone uses too much and we can't have a
way to group the related variables, it's going to be a mess. But if it's
through a named actor and jobs users have different named actors, then it's
OK. We can limit memory usage through these actors to protect the cluster.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#12635 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAADUSXOM64NL5Q6GBZZ3FDSUFYMRANCNFSM4UOG4ZQQ>
.
|
Both @ericl and @ahbone 's opinions are impressive. As @ahbone say, the management of the detached objs looks like shouldn't be given to the user. This will mess the code of user. Yes, using detached named actor to customize a 'detached objs' is a method. But it's to complete, and is implicit. As a novice of ray, I spent hours to find some method to 'detach' a plasma obj In docs/issues and discussions, but I failed. In my opinion, Hope my ideas will help. |
@DonYum Thanks for your ideas here, and I agree this is a useful feature to support. @ericl Here for RayDP, as you mentioned that:
In this way, it looks like all objects can be passed back to the driver. Why detached objects will help here? |
Perhaps we don't allow "true" global objects in the first iteration, to avoid the problems mentioned above. If an object can only be transferred to another actor, then that would solve the RayDP use case (single job) and also allow other use cases which span multiple jobs.
The issue is the objects are still owned by the producing worker (the one that call ray.put). If that worker is killed, the object is lost, even if the reference is passed back to the driver.
|
Thanks for the explanation. I'll spend some time to check how to support this version. |
@ericl, sorry for not making progress for a while. I recently picked it again. I'm wondering whether this will make RayDP's case and other cases work: Since the value stored is immutable, whenever we need an updated value, we need to call
We don't need to take care of this, since Please let me know whether it's the correct direction to go or not. |
Is this the same thing as |
I double-checked your sample code, and I realized that I misunderstood it. It's about return
Here, if we give And here, we need to update
There will be a duplicate; otherwise, there is no extra cost for memory. Correct me if I was wrong here. So most of the updates should be in
Maybe some step is hard to do, so please give me some comments. |
@ericl Here is my plan to support this
So the overall flow might look like
In this way, the objects' life-cycle stays the same as before. We also avoid duplicating low-level copy. And we give the fundamental support for ownership sharing. Maybe ownership sharing is enough for now. We can call delete in the first owner to support ownership transferring. If you think it's good to go, I'll start implementing it. |
Hmm this seems a little complicated, I'm not sure we should be changing plasma here or introducing entry sharing as a concept. How about we outline a couple different design options before coming to a conclusion? I was originally thinking this would be a simple change, where the object is simply not deleted when the owner dies. If it turns out to be much harder then the feature might not be worth it. Btw: there is a new ownership-based object directory that might conflict with this change: #12552 |
@ericl The sharing info tells the lower-level store that you don't need to create an entry, but instead, you can share it with the existing one. I haven't figured out how to ingest this sharing info in GCS or the new ownership-based object directory.
I thought about this before. But if this node(A) passes this obj_ref to another node(B), and B needs to download this info from the owner(X), which is dead. So B also needs this information that the obj_ref is owned by X but pinned in B. If we want to support pin an obj locally and the visibility is limited to the current node, it'll be a simpler change. We just don't delete the pinned obj even owner is dead. |
You can fall back to the GCS in this case right? The owner can respond that the object is owned by the GCS if it's up; if it's down the caller can auto-retry at the GCS just in case. |
@ericl Maybe I missed some parts in the system. I thought the owner needs to be up to access that object.
|
Right, but we can change the metadata resolution protocol to fall back to querying the GCS if X is down. So in this case, Btw, another related use case is that we might consider transferring object ownership to the GCS if it's ever spilled to external storage such as S3 (cc @rkooo567 @clarkzinzow ). This would allow the object to be retrieved even if the owner has been killed for some reason (might be useful for large scale distributed shuffle use cases). In this case, we would want to automatically transfer ownership to the GCS on spill without any API call from the user. |
But even it falls back to querying the GCS, it still can't get the actual data since the owner is dead. I feel that we are talking about different scenarios.
Is this one also case 2? |
If we transfer the metadata to the GCS this would fix that right? The protocol I'm imagining is: Original owner:
Resolver:
This seems like a more narrow case than general ownership transfer to GCS. Note that nodes don't own objects, worker processes within the node own objects. The problem here is not a node dying, it's the ability to kill a worker actor.
Hmm I don't quite get this one, once the owner is dead, it will never come back right?
I think it's neither; it's transferring ownership to the GCS. |
@ericl Thanks for the clarification. It looks like we are talking about the same problem. The only difference is about who should own that object. Let me summarize our ideas below, and please correct me if I was wrong. First one:
Things need to update:
Second one:
Things need to update:
In my opinion about the pros and cons:
I feel the complexity of transferring ownership to GCS comes from object management, which looks different from before. Probably it's OK. The fallback can cause some performance issues, but it might be OK depending on the use case. |
Hmm I see, basically you're proposing a fast logical copy which generates a new object id. @stephanie-wang any thoughts on this vs ownership transfer? We could potentially also take a copy approach to spilling. Does it make sense to investigate the difficulty of these two approaches more first? |
@iycheng Shouldn't a fourth solution be considered which simply treats an objref like a shared_ptr? E.g. nothing needed from the user, as long as one ObjRef exists the object will be available. This will probably imply moving objects to the GCS if the owner dies and additional effort to get the ref-counting right. As mentioned before, this seems to be the only fully composable option. |
@fschlimb The second one is almost similar to the one you mentioned. If we make it default for all variables which introduces extra performance cost. So at someplace, we should have some kind of call to tell the system, it's shared_ptr, not unique_ptr. In the second solution, it's like the caller will do this. Or we make callee do this kind of work, like return an ObjectRef and user set the 'shared bit' set to true and in caller side, it'll just do this work automatically. From the API we can do this one easily if we support the second proposal. One is explicitly shared and the other one is default shared. |
@iycheng Yes, I agree, these are closely related. The point I am trying to make is that I do not see how 2 (or 1 and 3) is (are) composable. Without shared being the default, how would different packages orchestrated to a larger program synchronize? I see only 2 practical options: 1. all producers explicitly declare all outgoing refs shared or 2. the consumers always convert all incoming objrefs to shared. Both are pretty fragile/error prone and cumbersome. |
@fschlimb I'm not an expert in ray's using patterns, so correct me if I'm wrong. From my understanding, the One difference between (2) and (4) is that (2) always generates new object id while (4) reuse the old one. The first one make it fit into the current framework easily, and we'll only have overhead for the necessary one. Default is definitely one way to do this at the API level, but we need to revisit the reference counting algorithm and memory management. It'll be pinned at the node for shared objects to avoid eviction. If we make it default by all, it'll waste a lot of memory. |
Hi, is this issue still active? was there any progress? Thanks |
Hi all, I have read the proposals. @ericl @iycheng Is there any progress since then? All of them work for us, but some are harder to implement than others. So far the proposals assume objects are already in the plasma, but is it possible for a worker to put an object into plasma on behalf of another worker? In other words, we can add an API like Thanks |
We have a public design doc here on the future approach, which is in
progress:
https://docs.google.com/document/d/1Immr9m049Ikj8Pq3G06Csm8sHyhBvzpWENGB5lHPbOY/edit?usp=drivesdk
…On Sun, May 30, 2021, 11:41 PM Zhi Lin ***@***.***> wrote:
Hi all,
First, thank you all for working on this issue! I am from the raydp
<https://github.com/oap-project/raydp> team. This feature is very
important in our project, so we want to settle on a solution and implement
it.
I have read the proposals. @ericl <https://github.com/ericl> @iycheng
<https://github.com/iycheng> Is there any progress since then? All of
them work for us, but some are harder to implement than others. So far the
proposals assume objects are already in the plasma, but is it possible for
a worker to put an object into plasma on behalf of another worker? In other
words, we can add an API like ray.put(obj, owner=another_worker). Here
another_worker could be a global named worker. Changing the ownership
info in the first place might save us from having inconsistent copies. I'm
not an expert of ray core, so please correct me if I'm wrong.
Thanks
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#12635 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAADUSQYMOHA3Q6TBMHXX4TTQMVSJANCNFSM4UOG4ZQQ>
.
|
I've updated the description given the current state of things. The proposal is to extend the ray.put with owner API to allow GCS owned objects. In the future, we can then enable automatic transfer to GCS. |
You mean the |
Only the metadata ownership would be handled by the GCS--- everything else including ref counting remains the same. |
Overview
In the current ownership model, all created objects are owned by some worker executing the job. This means however that when a job exits, all created objects become unavailable.
In certain cases it is desirable to share objects between jobs (e.g., shared cache), without creating objects explicitly from a detached actor.
We have a current internal API to allow the owner of an object created with ray.put() to be assigned to a specific actor, e.g.:
This means the object created will fate-share with that actor instead of the current worker process. This means that it's currently possible to create global objects by creating a named detached actor, and setting the owner to that actor.
However, it would be nice to support
ray.put(data, _owner="global")
to avoid the need for that hack, and allow the object to be truly HA (e.g., tracked durably in HA GCS storage).The text was updated successfully, but these errors were encountered: