From 2180f0e4ff5373c89514c83034ef98c123323501 Mon Sep 17 00:00:00 2001 From: Yucong He Date: Wed, 23 May 2018 13:13:40 -0700 Subject: [PATCH 1/6] Clean up and move named_actor implementations to the experimental folder. Add tests --- python/ray/experimental/__init__.py | 5 +++- python/ray/experimental/named_actors.py | 37 +++++++++++++++++++++++++ test/actor_test.py | 30 ++++++++++++++++++++ 3 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 python/ray/experimental/named_actors.py diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index e9697eee48d1..042b855e8b0d 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -7,10 +7,13 @@ flush_redis_unsafe, flush_task_and_object_metadata_unsafe, flush_finished_tasks_unsafe, flush_evicted_objects_unsafe, _flush_finished_tasks_unsafe_shard, _flush_evicted_objects_unsafe_shard) +from .named_actors import ( + get_actor, register_actor +) __all__ = [ "TensorFlowVariables", "flush_redis_unsafe", "flush_task_and_object_metadata_unsafe", "flush_finished_tasks_unsafe", "flush_evicted_objects_unsafe", "_flush_finished_tasks_unsafe_shard", - "_flush_evicted_objects_unsafe_shard" + "_flush_evicted_objects_unsafe_shard", "get_actor", "register_actor" ] diff --git a/python/ray/experimental/named_actors.py b/python/ray/experimental/named_actors.py new file mode 100644 index 000000000000..f274c87adb7a --- /dev/null +++ b/python/ray/experimental/named_actors.py @@ -0,0 +1,37 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray +import ray.cloudpickle as pickle +""" +This file contains functions intended to implement the named actor +""" + + +# global_worker = ray.worker.get_global_worker() + +def _calculate_key_(name): + return b"Actor:" + str.encode(name) + +def get_actor(name): + worker = ray.worker.get_global_worker() + actor_hash = _calculate_key_(name) + pickled_state = worker.redis_client.hmget(actor_hash, name) + assert len(pickled_state) == 1, \ + "Error: Multiple actors under this name." + assert pickled_state[0] is not None, \ + "Error: actor with name {} doesn't exist".format(name) + handle = pickle.loads(pickled_state[0]) + return handle + +def register_actor(name, actor_handle): + worker = ray.worker.get_global_worker() + actor_hash = _calculate_key_(name) + assert type(actor_handle) == ray.actor.ActorHandle, \ + "Error: you could only store named-actors." + is_existed = worker.redis_client.hexists(actor_hash, name) + assert not is_existed, \ + "Error: the actor with name={} already exists".format(name) + pickled_state = pickle.dumps(actor_handle) + worker.redis_client.hmset(actor_hash, {name: pickled_state}) diff --git a/test/actor_test.py b/test/actor_test.py index 594a82f67b7c..d3ad4f5ce1a2 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1906,6 +1906,36 @@ def method(self): # we should also test this from a different driver. ray.get(new_f.method.remote()) + def testRegisterAndGetActorHandle(self): + # TODO(heyucongtom): We may want to test this from another driver? + # One viable way might be setting up another ray process here, connecting to the + # redis_address, sending the objectID back, and compare + + ray.worker.init(num_workers=1) + + @ray.remote + class Foo(object): + def method(self): + pass + + f1 = Foo.remote() + # Test saving f + ray.experimental.register_actor("f1", f1) + # Test getting f + f2 = ray.experimental.get_actor("f1") + self.assertEqual(f1._actor_id, f2._actor_id) + + # Test same name register shall raise error + with self.assertRaises(AssertionError): + ray.experimental.register_actor("f1", f2) + + # Test register with wrong object type + with self.assertRaises(AssertionError): + ray.experimental.register_actor("f3", 1) + + # Test getting an unexist actor + with self.assertRaises(AssertionError): + err = ray.experimental.get_actor("unexisted") class ActorPlacementAndResources(unittest.TestCase): def tearDown(self): From 1a86d3f3e23afa1f72cc4f2723c7beec4e2b76eb Mon Sep 17 00:00:00 2001 From: Yucong He Date: Wed, 23 May 2018 13:14:30 -0700 Subject: [PATCH 2/6] clean up --- python/ray/experimental/named_actors.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/ray/experimental/named_actors.py b/python/ray/experimental/named_actors.py index f274c87adb7a..c3d1f14be05d 100644 --- a/python/ray/experimental/named_actors.py +++ b/python/ray/experimental/named_actors.py @@ -8,9 +8,6 @@ This file contains functions intended to implement the named actor """ - -# global_worker = ray.worker.get_global_worker() - def _calculate_key_(name): return b"Actor:" + str.encode(name) From aefd1077552550efe2fee27b92ac2b0029917cf3 Mon Sep 17 00:00:00 2001 From: Yucong He Date: Wed, 23 May 2018 16:29:51 -0700 Subject: [PATCH 3/6] Modify error type, add docstrings, fix (potential?) bug for pickling an actorhandle created by pickling. --- python/ray/actor.py | 3 +- python/ray/experimental/__init__.py | 4 +-- python/ray/experimental/named_actors.py | 44 ++++++++++++++++++------- test/actor_test.py | 23 +++++++++---- 4 files changed, 52 insertions(+), 22 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index ea29780024fa..180b903ec33b 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -869,7 +869,8 @@ def _serialization_helper(self, ray_forking): _ray_actor_creation_dummy_object_id.id(), "actor_method_cpus": self._ray_actor_method_cpus, "actor_driver_id": self._ray_actor_driver_id.id(), - "previous_actor_handle_id": self._ray_actor_handle_id.id(), + "previous_actor_handle_id": self._ray_actor_handle_id.id() + if self._ray_actor_handle_id else None, "ray_forking": ray_forking } diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index 042b855e8b0d..fa632e46393d 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -7,9 +7,7 @@ flush_redis_unsafe, flush_task_and_object_metadata_unsafe, flush_finished_tasks_unsafe, flush_evicted_objects_unsafe, _flush_finished_tasks_unsafe_shard, _flush_evicted_objects_unsafe_shard) -from .named_actors import ( - get_actor, register_actor -) +from .named_actors import (get_actor, register_actor) __all__ = [ "TensorFlowVariables", "flush_redis_unsafe", diff --git a/python/ray/experimental/named_actors.py b/python/ray/experimental/named_actors.py index c3d1f14be05d..78e74739b16a 100644 --- a/python/ray/experimental/named_actors.py +++ b/python/ray/experimental/named_actors.py @@ -8,27 +8,47 @@ This file contains functions intended to implement the named actor """ + def _calculate_key_(name): + """Generate a Redis key with the given name + Args: + name: THe name of the named-actor + """ return b"Actor:" + str.encode(name) + def get_actor(name): + """Get a named actor which is previously created. If the actor + doesn't exist, it will return an error. + Args: + name: The name of the named-actor. + Returns: + The ActorHandle object corresponding to the name. + """ worker = ray.worker.get_global_worker() actor_hash = _calculate_key_(name) - pickled_state = worker.redis_client.hmget(actor_hash, name) - assert len(pickled_state) == 1, \ - "Error: Multiple actors under this name." - assert pickled_state[0] is not None, \ - "Error: actor with name {} doesn't exist".format(name) - handle = pickle.loads(pickled_state[0]) + pickled_state = worker.redis_client.hget(actor_hash, name) + if pickled_state is None: + raise ValueError("The actor with name={} doesn't exist".format(name)) + handle = pickle.loads(pickled_state) return handle + def register_actor(name, actor_handle): + """Register a named actor under a string key. + Args: + name: The name of the named-actor. + actor_handle: The actor object to be associated with this name + """ worker = ray.worker.get_global_worker() + if type(name) != str: + raise TypeError("You could only use string as key") + if type(actor_handle) != ray.actor.ActorHandle: + raise TypeError("You could only store named-actors.") actor_hash = _calculate_key_(name) - assert type(actor_handle) == ray.actor.ActorHandle, \ - "Error: you could only store named-actors." - is_existed = worker.redis_client.hexists(actor_hash, name) - assert not is_existed, \ - "Error: the actor with name={} already exists".format(name) pickled_state = pickle.dumps(actor_handle) - worker.redis_client.hmset(actor_hash, {name: pickled_state}) + is_existed = worker.redis_client.hsetnx(actor_hash, name, pickled_state) + + if is_existed == 0: + raise ValueError( + "Error: the actor with name={} already exists".format(name)) diff --git a/test/actor_test.py b/test/actor_test.py index d3ad4f5ce1a2..2b21baf41b2e 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1906,7 +1906,7 @@ def method(self): # we should also test this from a different driver. ray.get(new_f.method.remote()) - def testRegisterAndGetActorHandle(self): + def testRegisterAndGetNamedActors(self): # TODO(heyucongtom): We may want to test this from another driver? # One viable way might be setting up another ray process here, connecting to the # redis_address, sending the objectID back, and compare @@ -1915,8 +1915,12 @@ def testRegisterAndGetActorHandle(self): @ray.remote class Foo(object): + def __init__(self): + self.x = 0 + def method(self): - pass + self.x += 1 + return self.x f1 = Foo.remote() # Test saving f @@ -1926,16 +1930,23 @@ def method(self): self.assertEqual(f1._actor_id, f2._actor_id) # Test same name register shall raise error - with self.assertRaises(AssertionError): + with self.assertRaises(ValueError): ray.experimental.register_actor("f1", f2) # Test register with wrong object type - with self.assertRaises(AssertionError): + with self.assertRaises(TypeError): ray.experimental.register_actor("f3", 1) # Test getting an unexist actor - with self.assertRaises(AssertionError): - err = ray.experimental.get_actor("unexisted") + with self.assertRaises(ValueError): + err = ray.experimental.get_actor("nonexistent") + + # Test method + self.assertEqual(ray.get(f1.method.remote()), 1) + self.assertEqual(ray.get(f2.method.remote()), 2) + self.assertEqual(ray.get(f1.method.remote()), 3) + self.assertEqual(ray.get(f2.method.remote()), 4) + class ActorPlacementAndResources(unittest.TestCase): def tearDown(self): From e72f278a469e624cf409e4cc674772d7dec3185c Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 23 May 2018 16:53:54 -0700 Subject: [PATCH 4/6] Update __init__.py --- python/ray/experimental/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index fa632e46393d..58005f443001 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -7,7 +7,7 @@ flush_redis_unsafe, flush_task_and_object_metadata_unsafe, flush_finished_tasks_unsafe, flush_evicted_objects_unsafe, _flush_finished_tasks_unsafe_shard, _flush_evicted_objects_unsafe_shard) -from .named_actors import (get_actor, register_actor) +from .named_actors import get_actor, register_actor __all__ = [ "TensorFlowVariables", "flush_redis_unsafe", From a29098a293ae8b536f80ff99dbeaabd5277521fb Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 23 May 2018 17:11:31 -0700 Subject: [PATCH 5/6] Small changes. --- python/ray/experimental/named_actors.py | 43 ++++++++++++++----------- test/actor_test.py | 17 ++++------ 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/python/ray/experimental/named_actors.py b/python/ray/experimental/named_actors.py index 78e74739b16a..8243f97ba682 100644 --- a/python/ray/experimental/named_actors.py +++ b/python/ray/experimental/named_actors.py @@ -4,29 +4,33 @@ import ray import ray.cloudpickle as pickle -""" -This file contains functions intended to implement the named actor -""" -def _calculate_key_(name): - """Generate a Redis key with the given name +def _calculate_key(name): + """Generate a Redis key with the given name. + Args: - name: THe name of the named-actor + name: The name of the named actor. + + Returns: + The key to use for storing a named actor in Redis. """ - return b"Actor:" + str.encode(name) + return b"Actor:" + name.encode("ascii") def get_actor(name): - """Get a named actor which is previously created. If the actor - doesn't exist, it will return an error. + """Get a named actor which was previously created. + + If the actor doesn't exist, an exception will be raised. + Args: - name: The name of the named-actor. + name: The name of the named actor. + Returns: The ActorHandle object corresponding to the name. """ worker = ray.worker.get_global_worker() - actor_hash = _calculate_key_(name) + actor_hash = _calculate_key(name) pickled_state = worker.redis_client.hget(actor_hash, name) if pickled_state is None: raise ValueError("The actor with name={} doesn't exist".format(name)) @@ -36,19 +40,20 @@ def get_actor(name): def register_actor(name, actor_handle): """Register a named actor under a string key. + Args: - name: The name of the named-actor. + name: The name of the named actor. actor_handle: The actor object to be associated with this name """ worker = ray.worker.get_global_worker() - if type(name) != str: - raise TypeError("You could only use string as key") - if type(actor_handle) != ray.actor.ActorHandle: - raise TypeError("You could only store named-actors.") - actor_hash = _calculate_key_(name) + if not isinstance(actor_handle, ray.actor.ActorHandle): + raise TypeError("The actor_handle argument must be an ActorHandle " + "object.") + actor_hash = _calculate_key(name) pickled_state = pickle.dumps(actor_handle) - is_existed = worker.redis_client.hsetnx(actor_hash, name, pickled_state) - if is_existed == 0: + # Add the actor to Redis if it does not already exist. + updated = worker.redis_client.hsetnx(actor_hash, name, pickled_state) + if updated == 0: raise ValueError( "Error: the actor with name={} already exists".format(name)) diff --git a/test/actor_test.py b/test/actor_test.py index 2b21baf41b2e..4509748cf876 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1907,10 +1907,7 @@ def method(self): ray.get(new_f.method.remote()) def testRegisterAndGetNamedActors(self): - # TODO(heyucongtom): We may want to test this from another driver? - # One viable way might be setting up another ray process here, connecting to the - # redis_address, sending the objectID back, and compare - + # TODO(heyucongtom): We should test this from another driver. ray.worker.init(num_workers=1) @ray.remote @@ -1923,23 +1920,23 @@ def method(self): return self.x f1 = Foo.remote() - # Test saving f + # Test saving f. ray.experimental.register_actor("f1", f1) - # Test getting f + # Test getting f. f2 = ray.experimental.get_actor("f1") self.assertEqual(f1._actor_id, f2._actor_id) - # Test same name register shall raise error + # Test same name register shall raise error. with self.assertRaises(ValueError): ray.experimental.register_actor("f1", f2) - # Test register with wrong object type + # Test register with wrong object type. with self.assertRaises(TypeError): ray.experimental.register_actor("f3", 1) - # Test getting an unexist actor + # Test getting a nonexistent actor. with self.assertRaises(ValueError): - err = ray.experimental.get_actor("nonexistent") + ray.experimental.get_actor("nonexistent") # Test method self.assertEqual(ray.get(f1.method.remote()), 1) From 40d1b03c9921a21ef938fcb68f9f62443058fed9 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 23 May 2018 17:20:57 -0700 Subject: [PATCH 6/6] Add back check. --- python/ray/experimental/named_actors.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/experimental/named_actors.py b/python/ray/experimental/named_actors.py index 8243f97ba682..9ae7972fc37c 100644 --- a/python/ray/experimental/named_actors.py +++ b/python/ray/experimental/named_actors.py @@ -46,6 +46,8 @@ def register_actor(name, actor_handle): actor_handle: The actor object to be associated with this name """ worker = ray.worker.get_global_worker() + if not isinstance(name, str): + raise TypeError("The name argument must be a string.") if not isinstance(actor_handle, ray.actor.ActorHandle): raise TypeError("The actor_handle argument must be an ActorHandle " "object.")