diff --git a/python/ray/actor.py b/python/ray/actor.py index 505dd07c6a53..4d1012f3c731 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -17,7 +17,7 @@ def compute_actor_handle_id(actor_handle_id, num_forks): - """Deterministically comopute an actor handle ID. + """Deterministically compute an actor handle ID. A new actor handle ID is generated when it is forked from another actor handle. The new handle ID is computed as hash(old_handle_id || num_forks). @@ -28,7 +28,7 @@ def compute_actor_handle_id(actor_handle_id, num_forks): forked so far. Returns: - An object ID for the new actor handle. + An ID for the new actor handle. """ handle_id_hash = hashlib.sha1() handle_id_hash.update(actor_handle_id.id()) @@ -38,6 +38,38 @@ def compute_actor_handle_id(actor_handle_id, num_forks): return ray.local_scheduler.ObjectID(handle_id) +def compute_actor_handle_id_non_forked(actor_id, actor_handle_id, + current_task_id): + """Deterministically compute an actor handle ID in the non-forked case. + + This code path is used whenever an actor handle is pickled and unpickled + (for example, if a remote function closes over an actor handle). Then, + whenever the actor handle is used, a new actor handle ID will be generated + on the fly as a deterministic function of the actor ID, the previous actor + handle ID and the current task ID. + + TODO(rkn): It may be possible to cause problems by closing over multiple + actor handles in a remote function, which then get unpickled and give rise + to the same actor handle IDs. + + Args: + actor_id: The actor ID. + actor_handle_id: The original actor handle ID. + num_forks: The number of times the original actor handle has been + forked so far. + + Returns: + An ID for the new actor handle. + """ + handle_id_hash = hashlib.sha1() + handle_id_hash.update(actor_id.id()) + handle_id_hash.update(actor_handle_id.id()) + handle_id_hash.update(current_task_id.id()) + handle_id = handle_id_hash.digest() + assert len(handle_id) == 20 + return ray.local_scheduler.ObjectID(handle_id) + + def compute_actor_creation_function_id(class_id): """Compute the function ID for an actor creation task. @@ -494,357 +526,470 @@ def remote(self, *args, **kwargs): dependency=self._actor._ray_actor_cursor) -class ActorHandleWrapper(object): - """A wrapper for the contents of an ActorHandle. - - This is essentially just a dictionary, but it is used so that the recipient - can tell that an argument is an ActorHandle. - """ +class ActorClass(object): + """An actor class. - def __init__(self, actor_id, class_id, actor_handle_id, actor_cursor, - actor_counter, actor_method_names, - actor_method_num_return_vals, method_signatures, - checkpoint_interval, class_name, - actor_creation_dummy_object_id, actor_creation_resources, - actor_method_cpus): - # TODO(rkn): Some of these fields are probably not necessary. We should - # strip out the unnecessary fields to keep actor handles lightweight. - self.actor_id = actor_id - self.class_id = class_id - self.actor_handle_id = actor_handle_id - self.actor_cursor = actor_cursor - self.actor_counter = actor_counter - self.actor_method_names = actor_method_names - self.actor_method_num_return_vals = actor_method_num_return_vals - # TODO(swang): Fetch this information from Redis so that we don't have - # to fall back to pickle. - self.method_signatures = method_signatures - self.checkpoint_interval = checkpoint_interval - self.class_name = class_name - self.actor_creation_dummy_object_id = actor_creation_dummy_object_id - self.actor_creation_resources = actor_creation_resources - self.actor_method_cpus = actor_method_cpus - - -def wrap_actor_handle(actor_handle): - """Wrap the ActorHandle to store the fields. + This is a decorated class. It can be used to create actors. - Args: - actor_handle: The ActorHandle instance to wrap. - - Returns: - An ActorHandleWrapper instance that stores the ActorHandle's fields. + Attributes: + _modified_class: The original class that was decorated (with some + additional methods added like __ray_terminate__). + _class_id: The ID of this actor class. + _class_name: The name of this class. + _checkpoint_interval: The interval at which to checkpoint actor state. + _actor_creation_resources: The default resources required by the actor + creation task. + _actor_method_cpus: The number of CPUs required by actor method tasks. + _exported: True if the actor class has been exported and false + otherwise. """ - wrapper = ActorHandleWrapper( - actor_handle._ray_actor_id, - actor_handle._ray_class_id, - compute_actor_handle_id(actor_handle._ray_actor_handle_id, - actor_handle._ray_actor_forks), - actor_handle._ray_actor_cursor, - 0, # Reset the actor counter. - actor_handle._ray_actor_method_names, - actor_handle._ray_actor_method_num_return_vals, - actor_handle._ray_method_signatures, - actor_handle._ray_checkpoint_interval, - actor_handle._ray_class_name, - actor_handle._ray_actor_creation_dummy_object_id, - actor_handle._ray_actor_creation_resources, - actor_handle._ray_actor_method_cpus) - actor_handle._ray_actor_forks += 1 - return wrapper - - -def unwrap_actor_handle(worker, wrapper): - """Make an ActorHandle from the stored fields. - Args: - worker: The worker that is unwrapping the actor handle. - wrapper: An ActorHandleWrapper instance to unwrap. + def __init__(self, modified_class, class_id, checkpoint_interval, + actor_creation_resources, actor_method_cpus): + self._modified_class = modified_class + self._class_id = class_id + self._class_name = modified_class.__name__.encode("ascii") + self._checkpoint_interval = checkpoint_interval + self._actor_creation_resources = actor_creation_resources + self._actor_method_cpus = actor_method_cpus + self._exported = False - Returns: - The unwrapped ActorHandle instance. - """ - driver_id = worker.task_driver_id.id() - register_actor_signatures( - worker, driver_id, wrapper.class_id, wrapper.class_name, - wrapper.actor_method_names, wrapper.actor_method_num_return_vals, - wrapper.actor_creation_resources, wrapper.actor_method_cpus) - - actor_handle_class = make_actor_handle_class(wrapper.class_name) - actor_object = actor_handle_class.__new__(actor_handle_class) - actor_object._manual_init( - wrapper.actor_id, wrapper.class_id, wrapper.actor_handle_id, - wrapper.actor_cursor, wrapper.actor_counter, - wrapper.actor_method_names, wrapper.actor_method_num_return_vals, - wrapper.method_signatures, wrapper.checkpoint_interval, - wrapper.actor_creation_dummy_object_id, - wrapper.actor_creation_resources, wrapper.actor_method_cpus) - return actor_object - - -class ActorHandleParent(object): - """This is the parent class of all ActorHandle classes. - - This enables us to identify actor handles by checking if an object obj - satisfies isinstance(obj, ActorHandleParent). - """ - pass + def remote(self, *args, **kwargs): + """Create an actor. + Args: + args: These arguments are forwarded directly to the actor + constructor. + kwargs: These arguments are forwarded directly to the actor + constructor. -def make_actor_handle_class(class_name): - class ActorHandle(ActorHandleParent): - def __init__(self, *args, **kwargs): - raise Exception("Actor classes cannot be instantiated directly. " - "Instead of running '{}()', try '{}.remote()'." - .format(class_name, class_name)) + Returns: + A handle to the newly created actor. + """ + return self._submit(args=args, kwargs=kwargs) - @classmethod - def remote(cls, *args, **kwargs): - raise NotImplementedError("The classmethod remote() can only be " - "called on the original Class.") - - def _manual_init(self, actor_id, class_id, actor_handle_id, - actor_cursor, actor_counter, actor_method_names, - actor_method_num_return_vals, method_signatures, - checkpoint_interval, actor_creation_dummy_object_id, - actor_creation_resources, actor_method_cpus): - self._ray_actor_id = actor_id - self._ray_class_id = class_id - self._ray_actor_handle_id = actor_handle_id - self._ray_actor_cursor = actor_cursor - self._ray_actor_counter = actor_counter - self._ray_actor_method_names = actor_method_names - self._ray_actor_method_num_return_vals = ( - actor_method_num_return_vals) - self._ray_method_signatures = method_signatures - self._ray_checkpoint_interval = checkpoint_interval - self._ray_class_name = class_name - self._ray_actor_forks = 0 - self._ray_actor_creation_dummy_object_id = ( - actor_creation_dummy_object_id) - self._ray_actor_creation_resources = actor_creation_resources - self._ray_actor_method_cpus = actor_method_cpus - - def _actor_method_call(self, - method_name, - args=None, - kwargs=None, - dependency=None): - """Method execution stub for an actor handle. - - This is the function that executes when - `actor.method_name.remote(*args, **kwargs)` is called. Instead of - executing locally, the method is packaged as a task and scheduled - to the remote actor instance. - - Args: - self: The local actor handle. - method_name: The name of the actor method to execute. - args: A list of arguments for the actor method. - kwargs: A dictionary of keyword arguments for the actor method. - dependency: The object ID that this method is dependent on. - Defaults to None, for no dependencies. Most tasks should - pass in the dummy object returned by the preceding task. - Some tasks, such as checkpoint and terminate methods, have - no dependencies. + def _submit(self, + args, + kwargs, + num_cpus=None, + num_gpus=None, + resources=None): + """Create an actor. + + This method allows more flexibility than the remote method because + resource requirements can be specified and override the defaults in the + decorator. + + Args: + args: The arguments to forward to the actor constructor. + kwargs: The keyword arguments to forward to the actor constructor. + num_cpus: The number of CPUs required by the actor creation task. + num_gpus: The number of GPUs required by the actor creation task. + resources: The custom resources required by the actor creation + task. + + Returns: + A handle to the newly created actor. + """ + if ray.worker.global_worker.mode is None: + raise Exception("Actors cannot be created before ray.init() " + "has been called.") + + actor_id = ray.local_scheduler.ObjectID(_random_string()) + # The actor cursor is a dummy object representing the most recent + # actor method invocation. For each subsequent method invocation, + # the current cursor should be added as a dependency, and then + # updated to reflect the new invocation. + actor_cursor = None + + # Get the actor methods of the given class. + def pred(x): + return (inspect.isfunction(x) or inspect.ismethod(x) + or is_cython(x)) - Returns: - object_ids: A list of object IDs returned by the remote actor - method. - """ - ray.worker.check_connected() - ray.worker.check_main_thread() - function_signature = self._ray_method_signatures[method_name] - if args is None: - args = [] - if kwargs is None: - kwargs = {} - args = signature.extend_args(function_signature, args, kwargs) - - # Execute functions locally if Ray is run in PYTHON_MODE - # Copy args to prevent the function from mutating them. - if ray.worker.global_worker.mode == ray.PYTHON_MODE: - return getattr( - ray.worker.global_worker.actors[self._ray_actor_id], - method_name)(*copy.deepcopy(args)) - - # Add the execution dependency. - if dependency is None: - execution_dependencies = [] + actor_methods = inspect.getmembers( + self._modified_class, predicate=pred) + # Extract the signatures of each of the methods. This will be used + # to catch some errors if the methods are called with inappropriate + # arguments. + method_signatures = dict() + for k, v in actor_methods: + # Print a warning message if the method signature is not + # supported. We don't raise an exception because if the actor + # inherits from a class that has a method whose signature we + # don't support, there may not be much the user can do about it. + signature.check_signature_supported(v, warn=True) + method_signatures[k] = signature.extract_signature( + v, ignore_first=True) + + actor_method_names = [method_name for method_name, _ in actor_methods] + actor_method_num_return_vals = [] + for _, method in actor_methods: + if hasattr(method, "__ray_num_return_vals__"): + actor_method_num_return_vals.append( + method.__ray_num_return_vals__) else: - execution_dependencies = [dependency] - - is_actor_checkpoint_method = (method_name == "__ray_checkpoint__") + actor_method_num_return_vals.append(1) + # Do not export the actor class or the actor if run in PYTHON_MODE + # Instead, instantiate the actor locally and add it to + # global_worker's dictionary + if ray.worker.global_worker.mode == ray.PYTHON_MODE: + ray.worker.global_worker.actors[actor_id] = ( + self._modified_class.__new__(self._modified_class)) + else: + # Export the actor. + if not self._exported: + export_actor_class( + self._class_id, self._modified_class, actor_method_names, + actor_method_num_return_vals, self._checkpoint_interval, + ray.worker.global_worker) + self._exported = True + actor_cursor = export_actor( + actor_id, self._class_id, self._class_name, actor_method_names, + actor_method_num_return_vals, self._actor_creation_resources, + self._actor_method_cpus, ray.worker.global_worker) + + # We initialize the actor counter at 1 to account for the actor + # creation task. + actor_counter = 1 + actor_handle = ActorHandle( + actor_id, self._class_name, actor_cursor, actor_counter, + actor_method_names, actor_method_num_return_vals, + method_signatures, actor_cursor, self._actor_method_cpus, + ray.worker.global_worker.task_driver_id) + + # Call __init__ as a remote function. + if "__init__" in actor_handle._ray_actor_method_names: + actor_handle.__init__.remote(*args, **kwargs) + else: + if len(args) != 0 or len(kwargs) != 0: + raise Exception("Arguments cannot be passed to the actor " + "constructor because this actor class has no " + "__init__ method.") + + return actor_handle + + @property + def class_id(self): + return self._class_id + + +class ActorHandle(object): + """A handle to an actor. + + The fields in this class are prefixed with _ray_ to hide them from the user + and to avoid collision with actor method names. + + An ActorHandle can be created in three ways. First, by calling .remote() on + an ActorClass. Second, by passing an actor handle into a task (forking the + ActorHandle). Third, by directly serializing the ActorHandle (e.g., with + cloudpickle). + + Attributes: + _ray_actor_id: The ID of the corresponding actor. + _ray_actor_handle_id: The ID of this handle. If this is the "original" + handle for an actor (as opposed to one created by passing another + handle into a task), then this ID must be NIL_ID. If this + ActorHandle was created by forking an existing ActorHandle, then + this ID must be computed deterministically via + compute_actor_handle_id. If this ActorHandle was created by an + out-of-band mechanism (e.g., pickling), then this must be None (in + this case, a new actor handle ID will be generated on the fly every + time a method is invoked). + _ray_actor_cursor: The actor cursor is a dummy object representing the + most recent actor method invocation. For each subsequent method + invocation, the current cursor should be added as a dependency, and + then updated to reflect the new invocation. + _ray_actor_counter: The number of actor method invocations that we've + called so far. + _ray_actor_method_names: The names of the actor methods. + _ray_actor_method_num_return_vals: The number of return values for each + actor method. + _ray_method_signatures: The signatures of the actor methods. + _ray_class_name: The name of the actor class. + _ray_actor_forks: The number of times this handle has been forked. + _ray_actor_creation_dummy_object_id: The dummy object ID from the actor + creation task. + _ray_actor_method_cpus: The number of CPUs required by actor methods. + _ray_original_handle: True if this is the original actor handle for a + given actor. If this is true, then the actor will be destroyed when + this handle goes out of scope. + _ray_actor_driver_id: The driver ID of the job that created the actor + (it is possible that this ActorHandle exists on a driver with a + different driver ID). + _ray_previous_actor_handle_id: If this actor handle is not an original + handle, (e.g., it was created by forking or pickling), then + this is the ID of the handle that this handle was created from. + Otherwise, this is None. + """ - function_id = compute_actor_method_function_id( - self._ray_class_name, method_name) - object_ids = ray.worker.global_worker.submit_task( - function_id, - args, - actor_id=self._ray_actor_id, - actor_handle_id=self._ray_actor_handle_id, - actor_counter=self._ray_actor_counter, - is_actor_checkpoint_method=is_actor_checkpoint_method, - actor_creation_dummy_object_id=( - self._ray_actor_creation_dummy_object_id), - execution_dependencies=execution_dependencies) - # Update the actor counter and cursor to reflect the most recent - # invocation. - self._ray_actor_counter += 1 - self._ray_actor_cursor = object_ids.pop() - - # The last object returned is the dummy object that should be - # passed in to the next actor method. Do not return it to the user. - if len(object_ids) == 1: - return object_ids[0] - elif len(object_ids) > 1: - return object_ids - - # Make tab completion work. - def __dir__(self): - return self._ray_actor_method_names - - def __getattribute__(self, attr): - try: - # Check whether this is an actor method. - actor_method_names = object.__getattribute__( - self, "_ray_actor_method_names") - if attr in actor_method_names: - # We create the ActorMethod on the fly here so that the - # ActorHandle doesn't need a reference to the ActorMethod. - # The ActorMethod has a reference to the ActorHandle and - # this was causing cyclic references which were prevent - # object deallocation from behaving in a predictable - # manner. - actor_method_cls = ActorMethod - return actor_method_cls(self, attr) - except AttributeError: - pass - - # If the requested attribute is not a registered method, fall back - # to default __getattribute__. - return object.__getattribute__(self, attr) - - def __repr__(self): - return "Actor(" + self._ray_actor_id.hex() + ")" - - def __reduce__(self): - raise Exception("Actor objects cannot be pickled.") - - def __del__(self): - """Kill the worker that is running this actor.""" - # TODO(swang): Also clean up forked actor handles. - # Kill the worker if this is the original actor handle, created - # with Class.remote(). - if (ray.worker.global_worker.connected and - self._ray_actor_handle_id.id() == ray.worker.NIL_ACTOR_ID): - # TODO(rkn): Should we be passing in the actor cursor as a - # dependency here? - self._actor_method_call( - "__ray_terminate__", args=[self._ray_actor_id.id()]) - - return ActorHandle - - -def actor_handle_from_class(Class, class_id, actor_creation_resources, - checkpoint_interval, actor_method_cpus): - class_name = Class.__name__.encode("ascii") - actor_handle_class = make_actor_handle_class(class_name) - exported = [] - - class ActorHandle(actor_handle_class): - @classmethod - def remote(cls, *args, **kwargs): - if ray.worker.global_worker.mode is None: - raise Exception("Actors cannot be created before ray.init() " - "has been called.") - - actor_id = ray.local_scheduler.ObjectID(_random_string()) - # The ID for this instance of ActorHandle. These should be unique - # across instances with the same _ray_actor_id. - actor_handle_id = ray.local_scheduler.ObjectID( - ray.worker.NIL_ACTOR_ID) - # The actor cursor is a dummy object representing the most recent - # actor method invocation. For each subsequent method invocation, - # the current cursor should be added as a dependency, and then - # updated to reflect the new invocation. - actor_cursor = None - # The number of actor method invocations that we've called so far. - actor_counter = 0 - - # Get the actor methods of the given class. - def pred(x): - return (inspect.isfunction(x) or inspect.ismethod(x) - or is_cython(x)) - - actor_methods = inspect.getmembers(Class, predicate=pred) - # Extract the signatures of each of the methods. This will be used - # to catch some errors if the methods are called with inappropriate - # arguments. - method_signatures = dict() - for k, v in actor_methods: - # Print a warning message if the method signature is not - # supported. We don't raise an exception because if the actor - # inherits from a class that has a method whose signature we - # don't support, we there may not be much the user can do about - # it. - signature.check_signature_supported(v, warn=True) - method_signatures[k] = signature.extract_signature( - v, ignore_first=True) - - actor_method_names = [ - method_name for method_name, _ in actor_methods - ] - actor_method_num_return_vals = [] - for _, method in actor_methods: - if hasattr(method, "__ray_num_return_vals__"): - actor_method_num_return_vals.append( - method.__ray_num_return_vals__) - else: - actor_method_num_return_vals.append(1) - # Do not export the actor class or the actor if run in PYTHON_MODE - # Instead, instantiate the actor locally and add it to - # global_worker's dictionary - if ray.worker.global_worker.mode == ray.PYTHON_MODE: - ray.worker.global_worker.actors[actor_id] = ( - Class.__new__(Class)) - else: - # Export the actor. - if not exported: - export_actor_class(class_id, Class, actor_method_names, - actor_method_num_return_vals, - checkpoint_interval, - ray.worker.global_worker) - exported.append(0) - actor_cursor = export_actor( - actor_id, class_id, class_name, actor_method_names, - actor_method_num_return_vals, actor_creation_resources, - actor_method_cpus, ray.worker.global_worker) - # Increment the actor counter to account for the creation task. - actor_counter += 1 - - # Instantiate the actor handle. - actor_object = cls.__new__(cls) - actor_object._manual_init( - actor_id, class_id, actor_handle_id, actor_cursor, - actor_counter, actor_method_names, - actor_method_num_return_vals, method_signatures, - checkpoint_interval, actor_cursor, actor_creation_resources, - actor_method_cpus) - - # Call __init__ as a remote function. - if "__init__" in actor_object._ray_actor_method_names: - actor_object._actor_method_call( - "__init__", - args=args, - kwargs=kwargs, - dependency=actor_cursor) - else: - print("WARNING: this object has no __init__ method.") + def __init__(self, + actor_id, + class_name, + actor_cursor, + actor_counter, + actor_method_names, + actor_method_num_return_vals, + method_signatures, + actor_creation_dummy_object_id, + actor_method_cpus, + actor_driver_id, + actor_handle_id=None, + previous_actor_handle_id=None): + # False if this actor handle was created by forking or pickling. True + # if it was created by the _serialization_helper function. + self._ray_original_handle = previous_actor_handle_id is None + + self._ray_actor_id = actor_id + if self._ray_original_handle: + self._ray_actor_handle_id = ray.local_scheduler.ObjectID( + ray.worker.NIL_ACTOR_HANDLE_ID) + else: + self._ray_actor_handle_id = actor_handle_id + self._ray_actor_cursor = actor_cursor + self._ray_actor_counter = actor_counter + self._ray_actor_method_names = actor_method_names + self._ray_actor_method_num_return_vals = actor_method_num_return_vals + self._ray_method_signatures = method_signatures + self._ray_class_name = class_name + self._ray_actor_forks = 0 + self._ray_actor_creation_dummy_object_id = ( + actor_creation_dummy_object_id) + self._ray_actor_method_cpus = actor_method_cpus + self._ray_actor_driver_id = actor_driver_id + self._ray_previous_actor_handle_id = previous_actor_handle_id + + def _actor_method_call(self, + method_name, + args=None, + kwargs=None, + num_return_vals=None, + dependency=None): + """Method execution stub for an actor handle. + + This is the function that executes when + `actor.method_name.remote(*args, **kwargs)` is called. Instead of + executing locally, the method is packaged as a task and scheduled + to the remote actor instance. + + Args: + method_name: The name of the actor method to execute. + args: A list of arguments for the actor method. + kwargs: A dictionary of keyword arguments for the actor method. + dependency: The object ID that this method is dependent on. + Defaults to None, for no dependencies. Most tasks should + pass in the dummy object returned by the preceding task. + Some tasks, such as checkpoint and terminate methods, have + no dependencies. + + Returns: + object_ids: A list of object IDs returned by the remote actor + method. + """ + ray.worker.check_connected() + ray.worker.check_main_thread() + function_signature = self._ray_method_signatures[method_name] + if args is None: + args = [] + if kwargs is None: + kwargs = {} + args = signature.extend_args(function_signature, args, kwargs) + + # Execute functions locally if Ray is run in PYTHON_MODE + # Copy args to prevent the function from mutating them. + if ray.worker.global_worker.mode == ray.PYTHON_MODE: + return getattr(ray.worker.global_worker.actors[self._ray_actor_id], + method_name)(*copy.deepcopy(args)) + + # Add the execution dependency. + if dependency is None: + execution_dependencies = [] + else: + execution_dependencies = [dependency] - return actor_object + is_actor_checkpoint_method = (method_name == "__ray_checkpoint__") - return ActorHandle + if self._ray_actor_handle_id is None: + actor_handle_id = compute_actor_handle_id_non_forked( + self._ray_actor_id, self._ray_previous_actor_handle_id, + ray.worker.global_worker.current_task_id) + else: + actor_handle_id = self._ray_actor_handle_id + + function_id = compute_actor_method_function_id(self._ray_class_name, + method_name) + object_ids = ray.worker.global_worker.submit_task( + function_id, + args, + actor_id=self._ray_actor_id, + actor_handle_id=actor_handle_id, + actor_counter=self._ray_actor_counter, + is_actor_checkpoint_method=is_actor_checkpoint_method, + actor_creation_dummy_object_id=( + self._ray_actor_creation_dummy_object_id), + execution_dependencies=execution_dependencies, + num_return_vals=num_return_vals, + driver_id=self._ray_actor_driver_id) + # Update the actor counter and cursor to reflect the most recent + # invocation. + self._ray_actor_counter += 1 + # The last object returned is the dummy object that should be + # passed in to the next actor method. Do not return it to the user. + self._ray_actor_cursor = object_ids.pop() + + if len(object_ids) == 1: + object_ids = object_ids[0] + elif len(object_ids) == 0: + object_ids = None + + return object_ids + + # Make tab completion work. + def __dir__(self): + return self._ray_actor_method_names + + def __getattribute__(self, attr): + try: + # Check whether this is an actor method. + actor_method_names = object.__getattribute__( + self, "_ray_actor_method_names") + if attr in actor_method_names: + # We create the ActorMethod on the fly here so that the + # ActorHandle doesn't need a reference to the ActorMethod. + # The ActorMethod has a reference to the ActorHandle and + # this was causing cyclic references which were prevent + # object deallocation from behaving in a predictable + # manner. + actor_method_cls = ActorMethod + return actor_method_cls(self, attr) + except AttributeError: + pass + + # If the requested attribute is not a registered method, fall back + # to default __getattribute__. + return object.__getattribute__(self, attr) + + def __repr__(self): + return "Actor(" + self._ray_actor_id.hex() + ")" + + def __del__(self): + """Kill the worker that is running this actor.""" + # TODO(swang): Also clean up forked actor handles. + # Kill the worker if this is the original actor handle, created + # with Class.remote(). TODO(rkn): Even without passing handles around, + # this is not the right policy. the actor should be alive as long as + # there are ANY handles in scope in the process that created the actor, + # not just the first one. + if ray.worker.global_worker.connected and self._ray_original_handle: + # TODO(rkn): Should we be passing in the actor cursor as a + # dependency here? + # self.__ray__terminate__.remote() + self._actor_method_call("__ray_terminate__") + + @property + def _actor_id(self): + return self._ray_actor_id + + @property + def _actor_handle_id(self): + return self._ray_actor_handle_id + + def _serialization_helper(self, ray_forking): + """This is defined in order to make pickling work. + + Args: + ray_forking: True if this is being called because Ray is forking + the actor handle and false if it is being called by pickling. + + Returns: + A dictionary of the information needed to reconstruct the object. + """ + state = { + "actor_id": + self._ray_actor_id.id(), + "class_name": + self._ray_class_name, + "actor_forks": + self._ray_actor_forks, + "actor_cursor": + self._ray_actor_cursor.id(), + "actor_counter": + 0, # Reset the actor counter. + "actor_method_names": + self._ray_actor_method_names, + "actor_method_num_return_vals": + self._ray_actor_method_num_return_vals, + "method_signatures": + self._ray_method_signatures, + "actor_creation_dummy_object_id": + self._ray_actor_creation_dummy_object_id.id(), + "actor_method_cpus": + self._ray_actor_method_cpus, + "actor_driver_id": + self._ray_actor_driver_id.id(), + "previous_actor_handle_id": + self._ray_actor_handle_id.id(), + "ray_forking": + ray_forking + } + + if ray_forking: + self._ray_actor_forks += 1 + + return state + + def _deserialization_helper(self, state, ray_forking): + """This is defined in order to make pickling work. + + Args: + state: The serialized state of the actor handle. + ray_forking: True if this is being called because Ray is forking + the actor handle and false if it is being called by pickling. + """ + ray.worker.check_connected() + ray.worker.check_main_thread() + + if state["ray_forking"]: + actor_handle_id = compute_actor_handle_id( + ray.local_scheduler.ObjectID( + state["previous_actor_handle_id"]), state["actor_forks"]) + else: + actor_handle_id = None + + # This is the driver ID of the driver that owns the actor, not + # necessarily the driver that owns this actor handle. + actor_driver_id = ray.local_scheduler.ObjectID( + state["actor_driver_id"]) + + self.__init__( + ray.local_scheduler.ObjectID(state["actor_id"]), + state["class_name"], + ray.local_scheduler.ObjectID(state["actor_cursor"]), + state["actor_counter"], + state["actor_method_names"], + state["actor_method_num_return_vals"], + state["method_signatures"], + ray.local_scheduler.ObjectID( + state["actor_creation_dummy_object_id"]), + state["actor_method_cpus"], + actor_driver_id, + actor_handle_id=actor_handle_id, + previous_actor_handle_id=ray.local_scheduler.ObjectID( + state["previous_actor_handle_id"])) + + register_actor_signatures( + ray.worker.global_worker, actor_driver_id.id(), None, + self._ray_class_name, self._ray_actor_method_names, + self._ray_actor_method_num_return_vals, None, + self._ray_actor_method_cpus) + + def __getstate__(self): + """This code path is used by pickling but not by Ray forking.""" + return self._serialization_helper(False) + + def __setstate__(self, state): + """This code path is used by pickling but not by Ray forking.""" + return self._deserialization_helper(state, False) def make_actor(cls, resources, checkpoint_interval, actor_method_cpus): @@ -854,12 +999,7 @@ def make_actor(cls, resources, checkpoint_interval, actor_method_cpus): # Modify the class to have an additional method that will be used for # terminating the worker. class Class(cls): - def __ray_terminate__(self, actor_id): - # Record that this actor has been removed so that if this node - # dies later, the actor won't be recreated. Alternatively, we could - # remove the actor key from Redis here. - ray.worker.global_worker.redis_client.hset(b"Actor:" + actor_id, - "removed", True) + def __ray_terminate__(self): # Disconnect the worker from the local scheduler. The point of this # is so that when the worker kills itself below, the local # scheduler won't push an error message to the driver. @@ -945,8 +1085,8 @@ def __ray_checkpoint_restore__(self): class_id = _random_string() - return actor_handle_from_class(Class, class_id, resources, - checkpoint_interval, actor_method_cpus) + return ActorClass(Class, class_id, checkpoint_interval, resources, + actor_method_cpus) ray.worker.global_worker.fetch_and_register_actor = fetch_and_register_actor diff --git a/python/ray/rllib/a3c/a3c.py b/python/ray/rllib/a3c/a3c.py index fa10db9c918c..15f5aa187bc9 100644 --- a/python/ray/rllib/a3c/a3c.py +++ b/python/ray/rllib/a3c/a3c.py @@ -126,7 +126,7 @@ def _fetch_metrics_from_remote_evaluators(self): def _stop(self): # workaround for https://github.com/ray-project/ray/issues/1516 for ev in self.remote_evaluators: - ev.__ray_terminate__.remote(ev._ray_actor_id.id()) + ev.__ray_terminate__.remote() def _save(self, checkpoint_dir): checkpoint_path = os.path.join( diff --git a/python/ray/rllib/ddpg/ddpg.py b/python/ray/rllib/ddpg/ddpg.py index 343b323948b3..ae5e65e5bc3b 100644 --- a/python/ray/rllib/ddpg/ddpg.py +++ b/python/ray/rllib/ddpg/ddpg.py @@ -234,7 +234,7 @@ def _train_stats(self, start_timestep): def _stop(self): # workaround for https://github.com/ray-project/ray/issues/1516 for ev in self.remote_evaluators: - ev.__ray_terminate__.remote(ev._ray_actor_id.id()) + ev.__ray_terminate__.remote() def _save(self, checkpoint_dir): checkpoint_path = self.saver.save( diff --git a/python/ray/rllib/dqn/dqn.py b/python/ray/rllib/dqn/dqn.py index cd7e85847a93..9a0da4e80704 100644 --- a/python/ray/rllib/dqn/dqn.py +++ b/python/ray/rllib/dqn/dqn.py @@ -232,7 +232,7 @@ def _train_stats(self, start_timestep): def _stop(self): # workaround for https://github.com/ray-project/ray/issues/1516 for ev in self.remote_evaluators: - ev.__ray_terminate__.remote(ev._ray_actor_id.id()) + ev.__ray_terminate__.remote() def _save(self, checkpoint_dir): checkpoint_path = self.saver.save( diff --git a/python/ray/rllib/es/es.py b/python/ray/rllib/es/es.py index ca1bf4da69fe..f5ea4fa373ed 100644 --- a/python/ray/rllib/es/es.py +++ b/python/ray/rllib/es/es.py @@ -311,7 +311,7 @@ def _train(self): def _stop(self): # workaround for https://github.com/ray-project/ray/issues/1516 for w in self.workers: - w.__ray_terminate__.remote(w._ray_actor_id.id()) + w.__ray_terminate__.remote() def _save(self, checkpoint_dir): checkpoint_path = os.path.join( diff --git a/python/ray/rllib/ppo/ppo.py b/python/ray/rllib/ppo/ppo.py index 8f550d318f43..a8c695033e9a 100644 --- a/python/ray/rllib/ppo/ppo.py +++ b/python/ray/rllib/ppo/ppo.py @@ -269,7 +269,7 @@ def _fetch_metrics_from_remote_evaluators(self): def _stop(self): # workaround for https://github.com/ray-project/ray/issues/1516 for ev in self.remote_evaluators: - ev.__ray_terminate__.remote(ev._ray_actor_id.id()) + ev.__ray_terminate__.remote() def _save(self, checkpoint_dir): checkpoint_path = self.saver.save( diff --git a/python/ray/rllib/utils/actors.py b/python/ray/rllib/utils/actors.py index d42114cb0011..a7f604bc2098 100644 --- a/python/ray/rllib/utils/actors.py +++ b/python/ray/rllib/utils/actors.py @@ -30,7 +30,7 @@ def count(self): def drop_colocated(actors): colocated, non_colocated = split_colocated(actors) for a in colocated: - a.__ray_terminate__.remote(a._ray_actor_id.id()) + a.__ray_terminate__.remote() return non_colocated diff --git a/python/ray/tune/trial.py b/python/ray/tune/trial.py index 9d12e768ce8d..7800cba29f75 100644 --- a/python/ray/tune/trial.py +++ b/python/ray/tune/trial.py @@ -182,9 +182,7 @@ def stop(self, error=False, error_msg=None, stop_logger=True): if self.runner: stop_tasks = [] stop_tasks.append(self.runner.stop.remote()) - stop_tasks.append( - self.runner.__ray_terminate__.remote( - self.runner._ray_actor_id.id())) + stop_tasks.append(self.runner.__ray_terminate__.remote()) # TODO(ekl) seems like wait hangs when killing actors _, unfinished = ray.wait( stop_tasks, num_returns=2, timeout=250) diff --git a/python/ray/worker.py b/python/ray/worker.py index dc246ce8e293..462bae50930a 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -267,7 +267,7 @@ def set_mode(self, mode): print any information about errors because some of the tests intentionally fail. - args: + Args: mode: One of SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, and SILENT_MODE. """ @@ -363,11 +363,6 @@ def put_object(self, object_id, value): "do this, you can wrap the ObjectID in a list and " "call 'put' on it (or return it).") - if isinstance(value, ray.actor.ActorHandleParent): - raise Exception("Calling 'put' on an actor handle is currently " - "not allowed (similarly, returning an actor " - "handle from a remote function is not allowed).") - # Serialize and put the object in the object store. try: self.store_and_register(object_id, value) @@ -525,7 +520,8 @@ def submit_task(self, num_return_vals=None, num_cpus=None, num_gpus=None, - resources=None): + resources=None, + driver_id=None): """Submit a remote task to the scheduler. Tell the scheduler to schedule the execution of the function with ID @@ -552,6 +548,11 @@ def submit_task(self, num_cpus: The number of CPUs required by this task. num_gpus: The number of GPUs required by this task. resources: The resource requirements for this task. + driver_id: The ID of the relevant driver. This is almost always the + driver ID of the driver that is currently running. However, in + the exceptional case that an actor task is being dispatched to + an actor created by a different driver, this should be the + driver ID of the driver that created the actor. Returns: The return object IDs for this task. @@ -579,9 +580,6 @@ def submit_task(self, for arg in args: if isinstance(arg, ray.local_scheduler.ObjectID): args_for_local_scheduler.append(arg) - elif isinstance(arg, ray.actor.ActorHandleParent): - args_for_local_scheduler.append( - put(ray.actor.wrap_actor_handle(arg))) elif ray.local_scheduler.check_simple_value(arg): args_for_local_scheduler.append(arg) else: @@ -591,9 +589,12 @@ def submit_task(self, if execution_dependencies is None: execution_dependencies = [] + if driver_id is None: + driver_id = self.task_driver_id + # Look up the various function properties. - function_properties = self.function_properties[ - self.task_driver_id.id()][function_id.id()] + function_properties = self.function_properties[driver_id.id()][ + function_id.id()] if num_return_vals is None: num_return_vals = function_properties.num_return_vals @@ -610,8 +611,7 @@ def submit_task(self, # Submit the task to local scheduler. task = ray.local_scheduler.Task( - self.task_driver_id, - ray.local_scheduler.ObjectID( + driver_id, ray.local_scheduler.ObjectID( function_id.id()), args_for_local_scheduler, num_return_vals, self.current_task_id, self.task_index, actor_creation_id, actor_creation_dummy_object_id, actor_id, @@ -749,8 +749,6 @@ def _get_arguments_for_execution(self, function_name, serialized_args): # created this object failed, and we should propagate the # error message here. raise RayGetArgumentError(function_name, i, arg, argument) - elif isinstance(argument, ray.actor.ActorHandleWrapper): - argument = ray.actor.unwrap_actor_handle(self, argument) else: # pass the argument by value argument = arg @@ -779,6 +777,10 @@ def _store_outputs_in_objstore(self, object_ids, outputs): passed into this function. """ for i in range(len(object_ids)): + if isinstance(outputs[i], ray.actor.ActorHandle): + raise Exception("Returning an actor handle from a remote " + "function is not allowed).") + self.put_object(object_ids[i], outputs[i]) def _process_task(self, task): @@ -1137,18 +1139,39 @@ def _initialize_serialization(worker=global_worker): pyarrow.register_torch_serialization_handlers(worker.serialization_context) # Define a custom serializer and deserializer for handling Object IDs. - def objectid_custom_serializer(obj): + def object_id_custom_serializer(obj): return obj.id() - def objectid_custom_deserializer(serialized_obj): + def object_id_custom_deserializer(serialized_obj): return ray.local_scheduler.ObjectID(serialized_obj) + # We register this serializer on each worker instead of calling + # register_custom_serializer from the driver so that isinstance still + # works. worker.serialization_context.register_type( ray.local_scheduler.ObjectID, "ray.ObjectID", pickle=False, - custom_serializer=objectid_custom_serializer, - custom_deserializer=objectid_custom_deserializer) + custom_serializer=object_id_custom_serializer, + custom_deserializer=object_id_custom_deserializer) + + def actor_handle_serializer(obj): + return obj._serialization_helper(True) + + def actor_handle_deserializer(serialized_obj): + new_handle = ray.actor.ActorHandle.__new__(ray.actor.ActorHandle) + new_handle._deserialization_helper(serialized_obj, True) + return new_handle + + # We register this serializer on each worker instead of calling + # register_custom_serializer from the driver so that isinstance still + # works. + worker.serialization_context.register_type( + ray.actor.ActorHandle, + "ray.ActorHandle", + pickle=False, + custom_serializer=actor_handle_serializer, + custom_deserializer=actor_handle_deserializer) if worker.mode in [SCRIPT_MODE, SILENT_MODE]: # These should only be called on the driver because @@ -1161,8 +1184,6 @@ def objectid_custom_deserializer(serialized_obj): register_custom_serializer(type(lambda: 0), use_pickle=True) # Tell Ray to serialize types with pickle. register_custom_serializer(type(int), use_pickle=True) - # Ray can serialize actor handles that have been wrapped. - register_custom_serializer(ray.actor.ActorHandleWrapper, use_dict=True) # Tell Ray to serialize FunctionSignatures as dictionaries. This is # used when passing around actor handles. register_custom_serializer( diff --git a/test/actor_test.py b/test/actor_test.py index 6114d2e1feb5..1bbf4d5103ad 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1822,7 +1822,12 @@ def testCallingPutOnActorHandle(self): @ray.remote class Counter(object): - pass + def __init__(self): + self.x = 0 + + def inc(self): + self.x += 1 + return self.x @ray.remote def f(): @@ -1832,18 +1837,34 @@ def f(): def g(): return [Counter.remote()] - with self.assertRaises(Exception): - ray.put(Counter.remote()) + # Currently, calling ray.put on an actor handle is allowed, but is + # there a good use case? + counter = Counter.remote() + counter_id = ray.put(counter) + new_counter = ray.get(counter_id) + assert ray.get(new_counter.inc.remote()) == 1 + assert ray.get(counter.inc.remote()) == 2 + assert ray.get(new_counter.inc.remote()) == 3 with self.assertRaises(Exception): ray.get(f.remote()) - # The below test is commented out because it currently does not behave - # properly. The call to g.remote() does not raise an exception because - # even though the actor handle cannot be pickled, pyarrow attempts to - # serialize it as a dictionary of its fields which kind of works. - # self.assertRaises(Exception): - # ray.get(g.remote()) + # The below test works, but do we want to disallow this usage? + ray.get(g.remote()) + + def testPicklingActorHandle(self): + ray.worker.init(num_workers=1) + + @ray.remote + class Foo(object): + def method(self): + pass + + f = Foo.remote() + new_f = ray.worker.pickle.loads(ray.worker.pickle.dumps(f)) + # Verify that we can call a method on the unpickled handle. TODO(rkn): + # we should also test this from a different driver. + ray.get(new_f.method.remote()) class ActorPlacementAndResources(unittest.TestCase):