-
Notifications
You must be signed in to change notification settings - Fork 7.8k
Prototype named actors. #2129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prototype named actors. #2129
Changes from 5 commits
2180f0e
1a86d3f
aefd107
e72f278
a29098a
40d1b03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,59 @@ | ||||||||||||||||||||||||||||||||||
| from __future__ import absolute_import | ||||||||||||||||||||||||||||||||||
| from __future__ import division | ||||||||||||||||||||||||||||||||||
| from __future__ import print_function | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's reserve assertions for checking things that should never actually happen. The errors (in both
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handled. |
||||||||||||||||||||||||||||||||||
| import ray | ||||||||||||||||||||||||||||||||||
| import ray.cloudpickle as pickle | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def _calculate_key(name): | ||||||||||||||||||||||||||||||||||
| """Generate a Redis key with the given name. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||
| name: The name of the named actor. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||
| The key to use for storing a named actor in Redis. | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| return b"Actor:" + name.encode("ascii") | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def get_actor(name): | ||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add docstrings for these methods with the format used e.g., in Lines 2443 to 2458 in 4584193
|
||||||||||||||||||||||||||||||||||
| """Get a named actor which was previously created. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| If the actor doesn't exist, an exception will be raised. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||
| name: The name of the named actor. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||
| The ActorHandle object corresponding to the name. | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| worker = ray.worker.get_global_worker() | ||||||||||||||||||||||||||||||||||
| actor_hash = _calculate_key(name) | ||||||||||||||||||||||||||||||||||
| pickled_state = worker.redis_client.hget(actor_hash, name) | ||||||||||||||||||||||||||||||||||
| if pickled_state is None: | ||||||||||||||||||||||||||||||||||
| raise ValueError("The actor with name={} doesn't exist".format(name)) | ||||||||||||||||||||||||||||||||||
| handle = pickle.loads(pickled_state) | ||||||||||||||||||||||||||||||||||
| return handle | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def register_actor(name, actor_handle): | ||||||||||||||||||||||||||||||||||
| """Register a named actor under a string key. | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||
| name: The name of the named actor. | ||||||||||||||||||||||||||||||||||
| actor_handle: The actor object to be associated with this name | ||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||
| worker = ray.worker.get_global_worker() | ||||||||||||||||||||||||||||||||||
| if not isinstance(actor_handle, ray.actor.ActorHandle): | ||||||||||||||||||||||||||||||||||
| raise TypeError("The actor_handle argument must be an ActorHandle " | ||||||||||||||||||||||||||||||||||
| "object.") | ||||||||||||||||||||||||||||||||||
| actor_hash = _calculate_key(name) | ||||||||||||||||||||||||||||||||||
| pickled_state = pickle.dumps(actor_handle) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| # Add the actor to Redis if it does not already exist. | ||||||||||||||||||||||||||||||||||
| updated = worker.redis_client.hsetnx(actor_hash, name, pickled_state) | ||||||||||||||||||||||||||||||||||
| if updated == 0: | ||||||||||||||||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||||||||||||||||
| "Error: the actor with name={} already exists".format(name)) | ||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1906,6 +1906,44 @@ def method(self): | |
| # we should also test this from a different driver. | ||
| ray.get(new_f.method.remote()) | ||
|
|
||
| def testRegisterAndGetNamedActors(self): | ||
| # TODO(heyucongtom): We should test this from another driver. | ||
| ray.worker.init(num_workers=1) | ||
|
|
||
| @ray.remote | ||
| class Foo(object): | ||
| def __init__(self): | ||
| self.x = 0 | ||
|
|
||
| def method(self): | ||
| self.x += 1 | ||
| return self.x | ||
|
|
||
| f1 = Foo.remote() | ||
| # Test saving f. | ||
| ray.experimental.register_actor("f1", f1) | ||
| # Test getting f. | ||
| f2 = ray.experimental.get_actor("f1") | ||
| self.assertEqual(f1._actor_id, f2._actor_id) | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's modify the actor to be something like so that here we can do
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handled. |
||
| # Test same name register shall raise error. | ||
| with self.assertRaises(ValueError): | ||
| ray.experimental.register_actor("f1", f2) | ||
|
|
||
| # Test register with wrong object type. | ||
| with self.assertRaises(TypeError): | ||
| ray.experimental.register_actor("f3", 1) | ||
|
|
||
| # Test getting a nonexistent actor. | ||
| with self.assertRaises(ValueError): | ||
| ray.experimental.get_actor("nonexistent") | ||
|
|
||
| # Test method | ||
| self.assertEqual(ray.get(f1.method.remote()), 1) | ||
| self.assertEqual(ray.get(f2.method.remote()), 2) | ||
| self.assertEqual(ray.get(f1.method.remote()), 3) | ||
| self.assertEqual(ray.get(f2.method.remote()), 4) | ||
|
|
||
|
|
||
| class ActorPlacementAndResources(unittest.TestCase): | ||
| def tearDown(self): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this change needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. ok I see that the test failures without it.