-
Notifications
You must be signed in to change notification settings - Fork 7k
Use different serialization context for each driver. #2406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@robertnishihara, please take a look, thanks. |
|
Test FAILed. |
|
Test PASSed. |
robertnishihara
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this looks pretty good! I left some comments.
python/ray/import_thread.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's convert include_driver to a bool before using it in the if statement below. Also note that there may be some Python 2/3 differences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here include_driver is treated as a string, I don't see the necessity to convert it into a boolean actually. Even if we have a function to do that, essentially it should still base on string parsing/checking. What do you think?
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why pass in driver_hash instead of just using self.task_driver_id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The design is for register_class_for_serialization. While calling it through run_function_on_all_workers, the function will be serialized by pickle. However pickle cannot serialize class ObjectId, so I use a hash instead of task_driver_id, then hash is used in the map to make the behavior consistent.
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove all of the calls to __hash__. What are they for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explained as above
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use case for registering the serializer on other drivers? Is it just for objects that are shared across applications (e.g., objects created by named actors)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, do we need that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need something like this eventually, although I think we may want to add it in later.
Currently, this behavior is not implemented, so perhaps there's no need to introduce it in this PR.
Having every driver register every custom serializer from every other driver seems like a bit too much to me, and we can probably do this in a more targeted way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will remove it at this point.
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why local=True? When will these serializers get registered on the workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to register all the default types locally in order to reduce the load of redis, and it's more efficient to do in this way. Your comment reminds me the conditional statement outside this block, I will remove it. Thanks.
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove driver_hash from the arguments. register_custom_serializer is part of the API, and I this seems to be an unnecessary API change since we can just use worker.trask_driver_id, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need it, since on the remote side we call this function in this way, that means the worker instance on remote side is passed in. Then worker.trask_driver_id cannot indicate the correct driver that the current class related to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure the function that is being called there is not register_custom_serializer, it is register_class_for_serialization, see
Line 2367 in 0cecf6b
| worker.run_function_on_all_workers(register_class_for_serialization) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I knew that. See the link I posted above. It is register_class_for_serialization that being called on remote side, but the parameter (worker) passed in on remote side is the remote worker.
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document this argument. Also, let's rename the argument to something more understandable. How about run_on_other_drivers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document the argument to this function and return value
|
Test PASSed. |
|
Test FAILed. |
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using a hash, we can define
driver_id_bytes = worker.task_driver_id.id()and then here we can do get_serialization_context(ray.ObjectID(driver_id_bytes)).
Then we can get rid of all of the calls to __hash__().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later we can just make ObjectIDs pickleable (which should be pretty simple).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
Test PASSed. |
|
Hi @robertnishihara, any other comments? |
|
@surehb I will take a look and get back to you later tonight. |
robertnishihara
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments. It's looking pretty good.
Note that there are some linting errors, and there seems to be some other issue with the tests, so there may be a bug.
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since driver_id is a keyword argument, it should be passed in as driver_id=driver_id.
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this argument to after local and before worker? I would prefer to avoid this argument since it shouldn't be used by the user and register_custom_serializer is part of the API. However, it's a bit better if the argument is at the end (similar to worker, which is also not used by the user).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This None value is never used, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put it here for better readability, people will easily know we have a task_driver_id which could be accessed in this way: work.task_driver_id.
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also document the return value.
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dirvers -> drivers
Can you also explain here that this may be useful for objects shared between jobs?
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if driver_id is None:
python/ray/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a period after processing?
|
Test FAILed. |
This reverts commit 32b181e.
|
Test FAILed. |
|
Test PASSed. |
python/ray/worker.py
Outdated
| use_pickle=True, | ||
| local=True, | ||
| driver_id=driver_id, | ||
| class_id="lambda0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"lambda"
python/ray/worker.py
Outdated
| use_pickle=True, | ||
| local=True, | ||
| driver_id=driver_id, | ||
| class_id="int") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"type"
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
Thank you @robertnishihara, I met some issues while using flake8 on my local, will figure out the usage later. |
|
Test FAILed. |
|
Test FAILed. |
|
Test FAILed. |
|
@robertnishihara, the change should be fine now, please help to merge. There are 3 failures in the checks: No.4 looks unrelated to my change; No.1 and No.7 fail for almost all PRs, could you please take a look? |
|
@surehb I just fixed a typo and made some very small changes. Please take a look. |
|
Test FAILed. |
|
@surehb thanks for this fix! Can you also add a test for this functionality? It's a little tricky since it requires starting multiple drivers. There are some examples in |
|
@robertnishihara, sure, will do it tomorrow. |
|
|
||
| if (self.worker.mode in [ray.SCRIPT_MODE, ray.SILENT_MODE] | ||
| if (run_on_other_drivers == "False" | ||
| and self.worker.mode in [ray.SCRIPT_MODE, ray.SILENT_MODE] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some issues with this if statement. In particular, run_on_other_drivers can be b"False". I think there are other issues as well. See #2769
What do these changes do?
Previously, we used shared serialization context for tasks generated by different drivers. This will cause problems some cases. For example, we have a task which will generate a new class (A) on worker during execution. Worker will try to register A if it fails to serialize it, see here. All the workers will receive the register message and those who have the same driver id will register the A into context. Every thing goes fine for the first time we run a task. However, since the worker runs in a while loop and never exit, so when the second time we run the same thing, (actually it starts from a different driver), the worker already has A registered in its context (by the previous execution) so it will not publish the register message, then the driver will never know how to deserialize it.
In this change:
We introduce a map (DriverId -> SerializationContext) in worker.py, so that we have different serialization context for different driver id. In this way, worker will be able to publish the register message.
PS.
This change has been tested for a couple of days in our cluster, and so far we don't see issues.
As TODO, we will need to remove the context of a driver id once all its tasks finished. This could be done together with GC once we have the 'job' added in.
Related issue number
2165
2288