-
Notifications
You must be signed in to change notification settings - Fork 7.7k
Prototype named actors. #2129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prototype named actors. #2129
Changes from 2 commits
2180f0e
1a86d3f
aefd107
e72f278
a29098a
40d1b03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,34 @@ | ||||||||||||||||||||||||||||||||||
| from __future__ import absolute_import | ||||||||||||||||||||||||||||||||||
| from __future__ import division | ||||||||||||||||||||||||||||||||||
| from __future__ import print_function | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's reserve assertions for checking things that should never actually happen. The errors (in both
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handled. |
||||||||||||||||||||||||||||||||||
| import ray | ||||||||||||||||||||||||||||||||||
| import ray.cloudpickle as pickle | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| This file contains functions intended to implement the named actor | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _calculate_key_(name): | ||||||||||||||||||||||||||||||||||
| return b"Actor:" + str.encode(name) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def get_actor(name): | ||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add docstrings for these methods with the format used e.g., in Lines 2443 to 2458 in 4584193
|
||||||||||||||||||||||||||||||||||
| worker = ray.worker.get_global_worker() | ||||||||||||||||||||||||||||||||||
| actor_hash = _calculate_key_(name) | ||||||||||||||||||||||||||||||||||
| pickled_state = worker.redis_client.hmget(actor_hash, name) | ||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can just use |
||||||||||||||||||||||||||||||||||
| assert len(pickled_state) == 1, \ | ||||||||||||||||||||||||||||||||||
| "Error: Multiple actors under this name." | ||||||||||||||||||||||||||||||||||
| assert pickled_state[0] is not None, \ | ||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should raise |
||||||||||||||||||||||||||||||||||
| "Error: actor with name {} doesn't exist".format(name) | ||||||||||||||||||||||||||||||||||
| handle = pickle.loads(pickled_state[0]) | ||||||||||||||||||||||||||||||||||
| return handle | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def register_actor(name, actor_handle): | ||||||||||||||||||||||||||||||||||
| worker = ray.worker.get_global_worker() | ||||||||||||||||||||||||||||||||||
| actor_hash = _calculate_key_(name) | ||||||||||||||||||||||||||||||||||
| assert type(actor_handle) == ray.actor.ActorHandle, \ | ||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should raise |
||||||||||||||||||||||||||||||||||
| "Error: you could only store named-actors." | ||||||||||||||||||||||||||||||||||
| is_existed = worker.redis_client.hexists(actor_hash, name) | ||||||||||||||||||||||||||||||||||
| assert not is_existed, \ | ||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should raise |
||||||||||||||||||||||||||||||||||
| "Error: the actor with name={} already exists".format(name) | ||||||||||||||||||||||||||||||||||
| pickled_state = pickle.dumps(actor_handle) | ||||||||||||||||||||||||||||||||||
| worker.redis_client.hmset(actor_hash, {name: pickled_state}) | ||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a race condition with this approach. It's possible that another worker could add publish a named actor with the same name between the calls to |
||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1906,6 +1906,36 @@ def method(self): | |
| # we should also test this from a different driver. | ||
| ray.get(new_f.method.remote()) | ||
|
|
||
| def testRegisterAndGetActorHandle(self): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we change this to |
||
| # TODO(heyucongtom): We may want to test this from another driver? | ||
| # One viable way might be setting up another ray process here, connecting to the | ||
| # redis_address, sending the objectID back, and compare | ||
|
|
||
| ray.worker.init(num_workers=1) | ||
|
|
||
| @ray.remote | ||
| class Foo(object): | ||
| def method(self): | ||
| pass | ||
|
|
||
| f1 = Foo.remote() | ||
| # Test saving f | ||
| ray.experimental.register_actor("f1", f1) | ||
| # Test getting f | ||
| f2 = ray.experimental.get_actor("f1") | ||
| self.assertEqual(f1._actor_id, f2._actor_id) | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's modify the actor to be something like so that here we can do
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handled. |
||
| # Test same name register shall raise error | ||
| with self.assertRaises(AssertionError): | ||
| ray.experimental.register_actor("f1", f2) | ||
|
|
||
| # Test register with wrong object type | ||
| with self.assertRaises(AssertionError): | ||
| ray.experimental.register_actor("f3", 1) | ||
|
|
||
| # Test getting an unexist actor | ||
| with self.assertRaises(AssertionError): | ||
| err = ray.experimental.get_actor("unexisted") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unexisted -> nonexistent |
||
|
|
||
| class ActorPlacementAndResources(unittest.TestCase): | ||
| def tearDown(self): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this can all fit on one line without the parentheses