diff --git a/python/ray/__init__.py b/python/ray/__init__.py index eee900c1d7dc..c45b81d5ec7a 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -46,7 +46,7 @@ e.args += (helpful_message, ) raise -from ray.local_scheduler import _config # noqa: E402 +from ray.local_scheduler import ObjectID, _config # noqa: E402 from ray.worker import (error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log, get_gpu_ids, get_webui_url, @@ -68,7 +68,7 @@ "remote", "log_event", "log_span", "flush_log", "actor", "method", "get_gpu_ids", "get_webui_url", "register_custom_serializer", "SCRIPT_MODE", "WORKER_MODE", "PYTHON_MODE", "SILENT_MODE", "global_state", - "_config", "__version__" + "ObjectID", "_config", "__version__" ] import ctypes # noqa: E402 diff --git a/python/ray/actor.py b/python/ray/actor.py index 4d1012f3c731..1023e334d486 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -12,8 +12,9 @@ import ray.local_scheduler import ray.signature as signature import ray.worker -from ray.utils import (FunctionProperties, _random_string, is_cython, - push_error_to_driver) +from ray.utils import _random_string, is_cython, push_error_to_driver + +DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1 def compute_actor_handle_id(actor_handle_id, num_forks): @@ -35,7 +36,7 @@ def compute_actor_handle_id(actor_handle_id, num_forks): handle_id_hash.update(str(num_forks).encode("ascii")) handle_id = handle_id_hash.digest() assert len(handle_id) == 20 - return ray.local_scheduler.ObjectID(handle_id) + return ray.ObjectID(handle_id) def compute_actor_handle_id_non_forked(actor_id, actor_handle_id, @@ -67,7 +68,7 @@ def compute_actor_handle_id_non_forked(actor_id, actor_handle_id, handle_id_hash.update(current_task_id.id()) handle_id = handle_id_hash.digest() assert len(handle_id) == 20 - return ray.local_scheduler.ObjectID(handle_id) + return ray.ObjectID(handle_id) def compute_actor_creation_function_id(class_id): @@ -79,7 +80,7 @@ def compute_actor_creation_function_id(class_id): Returns: The function ID of the actor creation event. """ - return ray.local_scheduler.ObjectID(class_id) + return ray.ObjectID(class_id) def compute_actor_method_function_id(class_name, attr): @@ -93,11 +94,11 @@ def compute_actor_method_function_id(class_name, attr): Function ID corresponding to the method. """ function_id_hash = hashlib.sha1() - function_id_hash.update(class_name) + function_id_hash.update(class_name.encode("ascii")) function_id_hash.update(attr.encode("ascii")) function_id = function_id_hash.digest() assert len(function_id) == 20 - return ray.local_scheduler.ObjectID(function_id) + return ray.ObjectID(function_id) def set_actor_checkpoint(worker, actor_id, checkpoint_index, checkpoint, @@ -257,7 +258,7 @@ def actor_method_executor(dummy_return_id, actor, *args): return actor_method_executor -def fetch_and_register_actor(actor_class_key, resources, worker): +def fetch_and_register_actor(actor_class_key, worker): """Import an actor. This will be called by the worker's import thread when the worker receives @@ -266,25 +267,20 @@ def fetch_and_register_actor(actor_class_key, resources, worker): Args: actor_class_key: The key in Redis to use to fetch the actor. - resources: The resources required for this actor's lifetime. worker: The worker to use. """ actor_id_str = worker.actor_id (driver_id, class_id, class_name, module, pickled_class, - checkpoint_interval, actor_method_names, - actor_method_num_return_vals) = worker.redis_client.hmget( + checkpoint_interval, actor_method_names) = worker.redis_client.hmget( actor_class_key, [ "driver_id", "class_id", "class_name", "module", "class", - "checkpoint_interval", "actor_method_names", - "actor_method_num_return_vals" + "checkpoint_interval", "actor_method_names" ]) - actor_name = class_name.decode("ascii") + class_name = class_name.decode("ascii") module = module.decode("ascii") checkpoint_interval = int(checkpoint_interval) actor_method_names = json.loads(actor_method_names.decode("ascii")) - actor_method_num_return_vals = json.loads( - actor_method_num_return_vals.decode("ascii")) # Create a temporary actor with some temporary methods so that if the actor # fails to be unpickled, the temporary actor can be used (just to produce @@ -297,11 +293,8 @@ class TemporaryActor(object): def temporary_actor_method(*xs): raise Exception("The actor with name {} failed to be imported, and so " - "cannot execute this method".format(actor_name)) + "cannot execute this method".format(class_name)) - # Register the actor method signatures. - register_actor_signatures(worker, driver_id, class_id, class_name, - actor_method_names, actor_method_num_return_vals) # Register the actor method executors. for actor_method_name in actor_method_names: function_id = compute_actor_method_function_id(class_name, @@ -311,8 +304,11 @@ def temporary_actor_method(*xs): actor_method_name, temporary_actor_method, actor_imported=False) - worker.functions[driver_id][function_id] = (actor_method_name, - temporary_executor) + worker.function_execution_info[driver_id][function_id] = ( + ray.worker.FunctionExecutionInfo( + function=temporary_executor, + function_name=actor_method_name, + max_calls=0)) worker.num_task_executions[driver_id][function_id] = 0 try: @@ -347,63 +343,16 @@ def pred(x): class_name, actor_method_name).id() executor = make_actor_method_executor( worker, actor_method_name, actor_method, actor_imported=True) - worker.functions[driver_id][function_id] = (actor_method_name, - executor) + worker.function_execution_info[driver_id][function_id] = ( + ray.worker.FunctionExecutionInfo( + function=executor, + function_name=actor_method_name, + max_calls=0)) # We do not set worker.function_properties[driver_id][function_id] # because we currently do need the actor worker to submit new tasks # for the actor. -def register_actor_signatures(worker, - driver_id, - class_id, - class_name, - actor_method_names, - actor_method_num_return_vals, - actor_creation_resources=None, - actor_method_cpus=None): - """Register an actor's method signatures in the worker. - - Args: - worker: The worker to register the signatures on. - driver_id: The ID of the driver that this actor is associated with. - class_id: The ID of the actor class. - class_name: The name of the actor class. - actor_method_names: The names of the methods to register. - actor_method_num_return_vals: A list of the number of return values for - each of the actor's methods. - actor_creation_resources: The resources required by the actor creation - task. - actor_method_cpus: The number of CPUs required by each actor method. - """ - assert len(actor_method_names) == len(actor_method_num_return_vals) - for actor_method_name, num_return_vals in zip( - actor_method_names, actor_method_num_return_vals): - # TODO(rkn): When we create a second actor, we are probably overwriting - # the values from the first actor here. This may or may not be a - # problem. - function_id = compute_actor_method_function_id(class_name, - actor_method_name).id() - worker.function_properties[driver_id][function_id] = ( - # The extra return value is an actor dummy object. - # In the cases where actor_method_cpus is None, that value should - # never be used. - FunctionProperties( - num_return_vals=num_return_vals + 1, - resources={"CPU": actor_method_cpus}, - max_calls=0)) - - if actor_creation_resources is not None: - # Also register the actor creation task. - function_id = compute_actor_creation_function_id(class_id) - worker.function_properties[driver_id][function_id.id()] = ( - # The extra return value is an actor dummy object. - FunctionProperties( - num_return_vals=0 + 1, - resources=actor_creation_resources, - max_calls=0)) - - def publish_actor_class_to_key(key, actor_class_info, worker): """Push an actor class definition to Redis. @@ -424,17 +373,14 @@ def publish_actor_class_to_key(key, actor_class_info, worker): def export_actor_class(class_id, Class, actor_method_names, - actor_method_num_return_vals, checkpoint_interval, - worker): + checkpoint_interval, worker): key = b"ActorClass:" + class_id actor_class_info = { "class_name": Class.__name__, "module": Class.__module__, "class": pickle.dumps(Class), "checkpoint_interval": checkpoint_interval, - "actor_method_names": json.dumps(list(actor_method_names)), - "actor_method_num_return_vals": - json.dumps(actor_method_num_return_vals) + "actor_method_names": json.dumps(list(actor_method_names)) } if worker.mode is None: @@ -455,43 +401,6 @@ def export_actor_class(class_id, Class, actor_method_names, # https://github.com/ray-project/ray/issues/1146. -def export_actor(actor_id, class_id, class_name, actor_method_names, - actor_method_num_return_vals, actor_creation_resources, - actor_method_cpus, worker): - """Export an actor to redis. - - Args: - actor_id (common.ObjectID): The ID of the actor. - class_id (str): A random ID for the actor class. - class_name (str): The actor class name. - actor_method_names (list): A list of the names of this actor's methods. - actor_method_num_return_vals: A list of the number of return values for - each of the actor's methods. - actor_creation_resources: A dictionary mapping resource name to the - quantity of that resource required by the actor. - actor_method_cpus: The number of CPUs required by actor methods. - """ - ray.worker.check_main_thread() - if worker.mode is None: - raise Exception("Actors cannot be created before Ray has been " - "started. You can start Ray with 'ray.init()'.") - - driver_id = worker.task_driver_id.id() - register_actor_signatures( - worker, - driver_id, - class_id, - class_name, - actor_method_names, - actor_method_num_return_vals, - actor_creation_resources=actor_creation_resources, - actor_method_cpus=actor_method_cpus) - - args = [class_id] - function_id = compute_actor_creation_function_id(class_id) - return worker.submit_task(function_id, args, actor_creation_id=actor_id)[0] - - def method(*args, **kwargs): assert len(args) == 0 assert len(kwargs) == 1 @@ -508,9 +417,10 @@ def annotate_method(method): # Create objects to wrap method invocations. This is done so that we can # invoke methods with actor.method.remote() instead of actor.method(). class ActorMethod(object): - def __init__(self, actor, method_name): + def __init__(self, actor, method_name, num_return_vals): self._actor = actor self._method_name = method_name + self._num_return_vals = num_return_vals def __call__(self, *args, **kwargs): raise Exception("Actor methods cannot be called directly. Instead " @@ -519,10 +429,17 @@ def __call__(self, *args, **kwargs): self._method_name)) def remote(self, *args, **kwargs): + return self._submit(args, kwargs) + + def _submit(self, args, kwargs, num_return_vals=None): + if num_return_vals is None: + num_return_vals = self._num_return_vals + return self._actor._actor_method_call( self._method_name, args=args, kwargs=kwargs, + num_return_vals=num_return_vals, dependency=self._actor._ray_actor_cursor) @@ -537,23 +454,71 @@ class ActorClass(object): _class_id: The ID of this actor class. _class_name: The name of this class. _checkpoint_interval: The interval at which to checkpoint actor state. - _actor_creation_resources: The default resources required by the actor - creation task. + _num_cpus: The default number of CPUs required by the actor creation + task. + _num_gpus: The default number of GPUs required by the actor creation + task. + _resources: The default resources required by the actor creation task. _actor_method_cpus: The number of CPUs required by actor method tasks. _exported: True if the actor class has been exported and false otherwise. + _actor_methods: The actor methods. + _method_signatures: The signatures of the methods. + _actor_method_names: The names of the actor methods. + _actor_method_num_return_vals: The default number of return values for + each actor method. """ - def __init__(self, modified_class, class_id, checkpoint_interval, - actor_creation_resources, actor_method_cpus): + def __init__(self, modified_class, class_id, checkpoint_interval, num_cpus, + num_gpus, resources, actor_method_cpus): self._modified_class = modified_class self._class_id = class_id - self._class_name = modified_class.__name__.encode("ascii") + self._class_name = modified_class.__name__ self._checkpoint_interval = checkpoint_interval - self._actor_creation_resources = actor_creation_resources + self._num_cpus = num_cpus + self._num_gpus = num_gpus + self._resources = resources self._actor_method_cpus = actor_method_cpus self._exported = False + # Get the actor methods of the given class. + def pred(x): + return (inspect.isfunction(x) or inspect.ismethod(x) + or is_cython(x)) + + self._actor_methods = inspect.getmembers( + self._modified_class, predicate=pred) + # Extract the signatures of each of the methods. This will be used + # to catch some errors if the methods are called with inappropriate + # arguments. + self._method_signatures = dict() + self._actor_method_num_return_vals = dict() + for method_name, method in self._actor_methods: + # Print a warning message if the method signature is not + # supported. We don't raise an exception because if the actor + # inherits from a class that has a method whose signature we + # don't support, there may not be much the user can do about it. + signature.check_signature_supported(method, warn=True) + self._method_signatures[method_name] = signature.extract_signature( + method, ignore_first=True) + + # Set the default number of return values for this method. + if hasattr(method, "__ray_num_return_vals__"): + self._actor_method_num_return_vals[method_name] = ( + method.__ray_num_return_vals__) + else: + self._actor_method_num_return_vals[method_name] = ( + DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS) + + self._actor_method_names = [ + method_name for method_name, _ in self._actor_methods + ] + + def __call__(self, *args, **kwargs): + raise Exception("Actors methods cannot be instantiated directly. " + "Instead of running '{}()', try '{}.remote()'.".format( + self._class_name, self._class_name)) + def remote(self, *args, **kwargs): """Create an actor. @@ -591,72 +556,54 @@ def _submit(self, Returns: A handle to the newly created actor. """ - if ray.worker.global_worker.mode is None: + worker = ray.worker.get_global_worker() + ray.worker.check_main_thread() + if worker.mode is None: raise Exception("Actors cannot be created before ray.init() " "has been called.") - actor_id = ray.local_scheduler.ObjectID(_random_string()) + actor_id = ray.ObjectID(_random_string()) # The actor cursor is a dummy object representing the most recent # actor method invocation. For each subsequent method invocation, # the current cursor should be added as a dependency, and then # updated to reflect the new invocation. actor_cursor = None - # Get the actor methods of the given class. - def pred(x): - return (inspect.isfunction(x) or inspect.ismethod(x) - or is_cython(x)) - - actor_methods = inspect.getmembers( - self._modified_class, predicate=pred) - # Extract the signatures of each of the methods. This will be used - # to catch some errors if the methods are called with inappropriate - # arguments. - method_signatures = dict() - for k, v in actor_methods: - # Print a warning message if the method signature is not - # supported. We don't raise an exception because if the actor - # inherits from a class that has a method whose signature we - # don't support, there may not be much the user can do about it. - signature.check_signature_supported(v, warn=True) - method_signatures[k] = signature.extract_signature( - v, ignore_first=True) - - actor_method_names = [method_name for method_name, _ in actor_methods] - actor_method_num_return_vals = [] - for _, method in actor_methods: - if hasattr(method, "__ray_num_return_vals__"): - actor_method_num_return_vals.append( - method.__ray_num_return_vals__) - else: - actor_method_num_return_vals.append(1) # Do not export the actor class or the actor if run in PYTHON_MODE - # Instead, instantiate the actor locally and add it to - # global_worker's dictionary - if ray.worker.global_worker.mode == ray.PYTHON_MODE: - ray.worker.global_worker.actors[actor_id] = ( - self._modified_class.__new__(self._modified_class)) + # Instead, instantiate the actor locally and add it to the worker's + # dictionary + if worker.mode == ray.PYTHON_MODE: + worker.actors[actor_id] = self._modified_class.__new__( + self._modified_class) else: # Export the actor. if not self._exported: - export_actor_class( - self._class_id, self._modified_class, actor_method_names, - actor_method_num_return_vals, self._checkpoint_interval, - ray.worker.global_worker) + export_actor_class(self._class_id, self._modified_class, + self._actor_method_names, + self._checkpoint_interval, worker) self._exported = True - actor_cursor = export_actor( - actor_id, self._class_id, self._class_name, actor_method_names, - actor_method_num_return_vals, self._actor_creation_resources, - self._actor_method_cpus, ray.worker.global_worker) + + resources = ray.utils.resources_from_resource_arguments( + self._num_cpus, self._num_gpus, self._resources, num_cpus, + num_gpus, resources) + + creation_args = [self._class_id] + function_id = compute_actor_creation_function_id(self._class_id) + [actor_cursor] = worker.submit_task( + function_id, + creation_args, + actor_creation_id=actor_id, + num_return_vals=1, + resources=resources) # We initialize the actor counter at 1 to account for the actor # creation task. actor_counter = 1 actor_handle = ActorHandle( actor_id, self._class_name, actor_cursor, actor_counter, - actor_method_names, actor_method_num_return_vals, - method_signatures, actor_cursor, self._actor_method_cpus, - ray.worker.global_worker.task_driver_id) + self._actor_method_names, self._method_signatures, + self._actor_method_num_return_vals, actor_cursor, + self._actor_method_cpus, worker.task_driver_id) # Call __init__ as a remote function. if "__init__" in actor_handle._ray_actor_method_names: @@ -703,9 +650,9 @@ class ActorHandle(object): _ray_actor_counter: The number of actor method invocations that we've called so far. _ray_actor_method_names: The names of the actor methods. - _ray_actor_method_num_return_vals: The number of return values for each - actor method. _ray_method_signatures: The signatures of the actor methods. + _ray_method_num_return_vals: The default number of return values for + each method. _ray_class_name: The name of the actor class. _ray_actor_forks: The number of times this handle has been forked. _ray_actor_creation_dummy_object_id: The dummy object ID from the actor @@ -729,8 +676,8 @@ def __init__(self, actor_cursor, actor_counter, actor_method_names, - actor_method_num_return_vals, method_signatures, + method_num_return_vals, actor_creation_dummy_object_id, actor_method_cpus, actor_driver_id, @@ -742,15 +689,15 @@ def __init__(self, self._ray_actor_id = actor_id if self._ray_original_handle: - self._ray_actor_handle_id = ray.local_scheduler.ObjectID( + self._ray_actor_handle_id = ray.ObjectID( ray.worker.NIL_ACTOR_HANDLE_ID) else: self._ray_actor_handle_id = actor_handle_id self._ray_actor_cursor = actor_cursor self._ray_actor_counter = actor_counter self._ray_actor_method_names = actor_method_names - self._ray_actor_method_num_return_vals = actor_method_num_return_vals self._ray_method_signatures = method_signatures + self._ray_method_num_return_vals = method_num_return_vals self._ray_class_name = class_name self._ray_actor_forks = 0 self._ray_actor_creation_dummy_object_id = ( @@ -786,8 +733,11 @@ def _actor_method_call(self, object_ids: A list of object IDs returned by the remote actor method. """ - ray.worker.check_connected() + worker = ray.worker.get_global_worker() + + worker.check_connected() ray.worker.check_main_thread() + function_signature = self._ray_method_signatures[method_name] if args is None: args = [] @@ -797,8 +747,8 @@ def _actor_method_call(self, # Execute functions locally if Ray is run in PYTHON_MODE # Copy args to prevent the function from mutating them. - if ray.worker.global_worker.mode == ray.PYTHON_MODE: - return getattr(ray.worker.global_worker.actors[self._ray_actor_id], + if worker.mode == ray.PYTHON_MODE: + return getattr(worker.actors[self._ray_actor_id], method_name)(*copy.deepcopy(args)) # Add the execution dependency. @@ -812,13 +762,13 @@ def _actor_method_call(self, if self._ray_actor_handle_id is None: actor_handle_id = compute_actor_handle_id_non_forked( self._ray_actor_id, self._ray_previous_actor_handle_id, - ray.worker.global_worker.current_task_id) + worker.current_task_id) else: actor_handle_id = self._ray_actor_handle_id function_id = compute_actor_method_function_id(self._ray_class_name, method_name) - object_ids = ray.worker.global_worker.submit_task( + object_ids = worker.submit_task( function_id, args, actor_id=self._ray_actor_id, @@ -828,7 +778,9 @@ def _actor_method_call(self, actor_creation_dummy_object_id=( self._ray_actor_creation_dummy_object_id), execution_dependencies=execution_dependencies, - num_return_vals=num_return_vals, + # We add one for the dummy return ID. + num_return_vals=num_return_vals + 1, + resources={"CPU": self._ray_actor_method_cpus}, driver_id=self._ray_actor_driver_id) # Update the actor counter and cursor to reflect the most recent # invocation. @@ -860,8 +812,8 @@ def __getattribute__(self, attr): # this was causing cyclic references which were prevent # object deallocation from behaving in a predictable # manner. - actor_method_cls = ActorMethod - return actor_method_cls(self, attr) + return ActorMethod(self, attr, + self._ray_method_num_return_vals[attr]) except AttributeError: pass @@ -880,11 +832,11 @@ def __del__(self): # this is not the right policy. the actor should be alive as long as # there are ANY handles in scope in the process that created the actor, # not just the first one. - if ray.worker.global_worker.connected and self._ray_original_handle: + worker = ray.worker.get_global_worker() + if worker.connected and self._ray_original_handle: # TODO(rkn): Should we be passing in the actor cursor as a # dependency here? - # self.__ray__terminate__.remote() - self._actor_method_call("__ray_terminate__") + self.__ray_terminate__.remote() @property def _actor_id(self): @@ -917,10 +869,10 @@ def _serialization_helper(self, ray_forking): 0, # Reset the actor counter. "actor_method_names": self._ray_actor_method_names, - "actor_method_num_return_vals": - self._ray_actor_method_num_return_vals, "method_signatures": self._ray_method_signatures, + "method_num_return_vals": + self._ray_method_num_return_vals, "actor_creation_dummy_object_id": self._ray_actor_creation_dummy_object_id.id(), "actor_method_cpus": @@ -946,43 +898,36 @@ def _deserialization_helper(self, state, ray_forking): ray_forking: True if this is being called because Ray is forking the actor handle and false if it is being called by pickling. """ - ray.worker.check_connected() + worker = ray.worker.get_global_worker() + worker.check_connected() ray.worker.check_main_thread() if state["ray_forking"]: actor_handle_id = compute_actor_handle_id( - ray.local_scheduler.ObjectID( - state["previous_actor_handle_id"]), state["actor_forks"]) + ray.ObjectID(state["previous_actor_handle_id"]), + state["actor_forks"]) else: actor_handle_id = None # This is the driver ID of the driver that owns the actor, not # necessarily the driver that owns this actor handle. - actor_driver_id = ray.local_scheduler.ObjectID( - state["actor_driver_id"]) + actor_driver_id = ray.ObjectID(state["actor_driver_id"]) self.__init__( - ray.local_scheduler.ObjectID(state["actor_id"]), + ray.ObjectID(state["actor_id"]), state["class_name"], - ray.local_scheduler.ObjectID(state["actor_cursor"]), + ray.ObjectID(state["actor_cursor"]), state["actor_counter"], state["actor_method_names"], - state["actor_method_num_return_vals"], state["method_signatures"], - ray.local_scheduler.ObjectID( - state["actor_creation_dummy_object_id"]), + state["method_num_return_vals"], + ray.ObjectID(state["actor_creation_dummy_object_id"]), state["actor_method_cpus"], actor_driver_id, actor_handle_id=actor_handle_id, - previous_actor_handle_id=ray.local_scheduler.ObjectID( + previous_actor_handle_id=ray.ObjectID( state["previous_actor_handle_id"])) - register_actor_signatures( - ray.worker.global_worker, actor_driver_id.id(), None, - self._ray_class_name, self._ray_actor_method_names, - self._ray_actor_method_num_return_vals, None, - self._ray_actor_method_cpus) - def __getstate__(self): """This code path is used by pickling but not by Ray forking.""" return self._serialization_helper(False) @@ -992,7 +937,11 @@ def __setstate__(self, state): return self._deserialization_helper(state, False) -def make_actor(cls, resources, checkpoint_interval, actor_method_cpus): +def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus, + checkpoint_interval): + if checkpoint_interval is None: + checkpoint_interval = -1 + if checkpoint_interval == 0: raise Exception("checkpoint_interval must be greater than 0.") @@ -1000,12 +949,14 @@ def make_actor(cls, resources, checkpoint_interval, actor_method_cpus): # terminating the worker. class Class(cls): def __ray_terminate__(self): - # Disconnect the worker from the local scheduler. The point of this - # is so that when the worker kills itself below, the local - # scheduler won't push an error message to the driver. - ray.worker.global_worker.local_scheduler_client.disconnect() - import os - os._exit(0) + worker = ray.worker.get_global_worker() + if worker.mode != ray.PYTHON_MODE: + # Disconnect the worker from the local scheduler. The point of + # this is so that when the worker kills itself below, the local + # scheduler won't push an error message to the driver. + worker.local_scheduler_client.disconnect() + import os + os._exit(0) def __ray_save_checkpoint__(self): if hasattr(self, "__ray_save__"): @@ -1043,7 +994,7 @@ def __ray_checkpoint__(self): # scheduler has seen. Handle IDs for which no task has yet reached # the local scheduler will not be included, and may not be runnable # on checkpoint resumption. - actor_id = ray.local_scheduler.ObjectID(worker.actor_id) + actor_id = ray.ObjectID(worker.actor_id) frontier = worker.local_scheduler_client.get_actor_frontier( actor_id) # Save the checkpoint in Redis. TODO(rkn): Checkpoints @@ -1085,8 +1036,8 @@ def __ray_checkpoint_restore__(self): class_id = _random_string() - return ActorClass(Class, class_id, checkpoint_interval, resources, - actor_method_cpus) + return ActorClass(Class, class_id, checkpoint_interval, num_cpus, num_gpus, + resources, actor_method_cpus) ray.worker.global_worker.fetch_and_register_actor = fetch_and_register_actor diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 2c4bf831f3d1..dbcfa311b259 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -459,7 +459,7 @@ def dtypes(self): if isinstance(self._dtypes_cache, list) and \ isinstance(self._dtypes_cache[0], - ray.local_scheduler.ObjectID): + ray.ObjectID): self._dtypes_cache = pd.concat(ray.get(self._dtypes_cache)) self._dtypes_cache.index = self.columns diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py index 50df6a22a00d..4e86e05b1db4 100644 --- a/python/ray/dataframe/index_metadata.py +++ b/python/ray/dataframe/index_metadata.py @@ -55,9 +55,9 @@ def __init__(self, dfs=None, index=None, axis=0, lengths_oid=None, self._cached_index = False def _get__lengths(self): - if isinstance(self._lengths_cache, ray.local_scheduler.ObjectID) or \ + if isinstance(self._lengths_cache, ray.ObjectID) or \ (isinstance(self._lengths_cache, list) and - isinstance(self._lengths_cache[0], ray.local_scheduler.ObjectID)): + isinstance(self._lengths_cache[0], ray.ObjectID)): self._lengths_cache = ray.get(self._lengths_cache) return self._lengths_cache @@ -72,7 +72,7 @@ def _get__coord_df(self): Since we may have had an index set before our coord_df was materialized, we'll have to apply it to the newly materialized df """ - if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID): + if isinstance(self._coord_df_cache, ray.ObjectID): self._coord_df_cache = ray.get(self._coord_df_cache) if self._cached_index: self._coord_df_cache.index = self._index_cache @@ -89,7 +89,7 @@ def _set__coord_df(self, coord_df): If the set _IndexMetadata is an OID instead (due to a copy or whatever reason), we fall back relying on `_index_cache`. """ - if not isinstance(coord_df, ray.local_scheduler.ObjectID): + if not isinstance(coord_df, ray.ObjectID): self._index_cache = coord_df.index self._coord_df_cache = coord_df @@ -102,7 +102,7 @@ def _get_index(self): _IndexMetadata object without a specified `index` parameter (See the _IndexMetadata constructor for more details) """ - if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID): + if isinstance(self._coord_df_cache, ray.ObjectID): return self._index_cache else: return self._coord_df_cache.index @@ -119,7 +119,7 @@ def _set_index(self, new_index): assert len(new_index) == len(self) self._index_cache = new_index - if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID): + if isinstance(self._coord_df_cache, ray.ObjectID): self._cached_index = True else: self._coord_df_cache.index = new_index @@ -140,7 +140,7 @@ def _get_index_cache(self): if self._index_cache_validator is None: self._index_cache_validator = pd.RangeIndex(len(self)) elif isinstance(self._index_cache_validator, - ray.local_scheduler.ObjectID): + ray.ObjectID): self._index_cache_validator = ray.get(self._index_cache_validator) return self._index_cache_validator @@ -296,11 +296,11 @@ def squeeze(self, partition, index_within_partition): def copy(self): # TODO: Investigate copy-on-write wrapper for metadata objects coord_df_copy = self._coord_df_cache - if not isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID): + if not isinstance(self._coord_df_cache, ray.ObjectID): coord_df_copy = self._coord_df_cache.copy() lengths_copy = self._lengths_cache - if not isinstance(self._lengths_cache, ray.local_scheduler.ObjectID): + if not isinstance(self._lengths_cache, ray.ObjectID): lengths_copy = self._lengths_cache.copy() index_copy = self._index_cache diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 964b0f71e290..0cea70b9ecee 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -184,8 +184,8 @@ def _object_table(self, object_id): A dictionary with information about the object ID in question. """ # Allow the argument to be either an ObjectID or a hex string. - if not isinstance(object_id, ray.local_scheduler.ObjectID): - object_id = ray.local_scheduler.ObjectID(hex_to_binary(object_id)) + if not isinstance(object_id, ray.ObjectID): + object_id = ray.ObjectID(hex_to_binary(object_id)) # Return information about a single object ID. object_locations = self._execute_command(object_id, @@ -297,7 +297,7 @@ def _task_table(self, task_id): TaskExecutionDependencies.GetRootAsTaskExecutionDependencies( task_table_message.ExecutionDependencies(), 0)) execution_dependencies = [ - ray.local_scheduler.ObjectID( + ray.ObjectID( execution_dependencies_message.ExecutionDependencies(i)) for i in range( execution_dependencies_message.ExecutionDependenciesLength()) @@ -335,7 +335,7 @@ def task_table(self, task_id=None): """ self._check_connected() if task_id is not None: - task_id = ray.local_scheduler.ObjectID(hex_to_binary(task_id)) + task_id = ray.ObjectID(hex_to_binary(task_id)) return self._task_table(task_id) else: task_table_keys = self._keys(TASK_PREFIX + "*") @@ -343,7 +343,7 @@ def task_table(self, task_id=None): for key in task_table_keys: task_id_binary = key[len(TASK_PREFIX):] results[binary_to_hex(task_id_binary)] = self._task_table( - ray.local_scheduler.ObjectID(task_id_binary)) + ray.ObjectID(task_id_binary)) return results def function_table(self, function_id=None): @@ -628,8 +628,7 @@ def micros_rel(ts): # modify it in place since we will use the original values later. total_info = copy.copy(task_table[task_id]["TaskSpec"]) total_info["Args"] = [ - oid.hex() - if isinstance(oid, ray.local_scheduler.ObjectID) else oid + oid.hex() if isinstance(oid, ray.ObjectID) else oid for oid in task_t_info["TaskSpec"]["Args"] ] total_info["ReturnObjectIDs"] = [ @@ -855,7 +854,7 @@ def micros_rel(ts): args = task_table[task_id]["TaskSpec"]["Args"] for arg in args: # Don't visualize arguments that are not object IDs. - if isinstance(arg, ray.local_scheduler.ObjectID): + if isinstance(arg, ray.ObjectID): object_info = self._object_table(arg) # Don't visualize objects that were created by calls to # put. diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py new file mode 100644 index 000000000000..4e09e4016b1d --- /dev/null +++ b/python/ray/remote_function.py @@ -0,0 +1,158 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import copy +import hashlib +import inspect + +import ray.signature + +# Default parameters for remote functions. +DEFAULT_REMOTE_FUNCTION_CPUS = 1 +DEFAULT_REMOTE_FUNCTION_NUM_RETURN_VALS = 1 +DEFAULT_REMOTE_FUNCTION_MAX_CALLS = 0 + + +def in_ipython(): + """Return true if we are in an IPython interpreter and false otherwise.""" + try: + __IPYTHON__ + return True + except NameError: + return False + + +def compute_function_id(function): + """Compute an function ID for a function. + + Args: + func: The actual function. + + Returns: + This returns the function ID. + """ + function_id_hash = hashlib.sha1() + # Include the function module and name in the hash. + function_id_hash.update(function.__module__.encode("ascii")) + function_id_hash.update(function.__name__.encode("ascii")) + # If we are running a script or are in IPython, include the source code in + # the hash. If we are in a regular Python interpreter we skip this part + # because the source code is not accessible. If the function is a built-in + # (e.g., Cython), the source code is not accessible. + import __main__ as main + if (hasattr(main, "__file__") or in_ipython()) \ + and inspect.isfunction(function): + function_id_hash.update(inspect.getsource(function).encode("ascii")) + # Compute the function ID. + function_id = function_id_hash.digest() + assert len(function_id) == 20 + function_id = ray.ObjectID(function_id) + + return function_id + + +class RemoteFunction(object): + """A remote function. + + This is a decorated function. It can be used to spawn tasks. + + Attributes: + _function: The original function. + _function_id: The ID of the function. + _function_name: The module and function name. + _num_cpus: The default number of CPUs to use for invocations of this + remote function. + _num_gpus: The default number of GPUs to use for invocations of this + remote function. + _resources: The default custom resource requirements for invocations of + this remote function. + _num_return_vals: The default number of return values for invocations + of this remote function. + _max_calls: The number of times a worker can execute this function + before executing. + _function_signature: The function signature. + """ + + def __init__(self, function, num_cpus, num_gpus, resources, + num_return_vals, max_calls): + self._function = function + # TODO(rkn): We store the function ID as a string, so that + # RemoteFunction objects can be pickled. We should undo this when + # we allow ObjectIDs to be pickled. + self._function_id = compute_function_id(self._function).id() + self._function_name = ( + self._function.__module__ + '.' + self._function.__name__) + self._num_cpus = (DEFAULT_REMOTE_FUNCTION_CPUS + if num_cpus is None else num_cpus) + self._num_gpus = num_gpus + self._resources = resources + self._num_return_vals = (DEFAULT_REMOTE_FUNCTION_NUM_RETURN_VALS if + num_return_vals is None else num_return_vals) + self._max_calls = (DEFAULT_REMOTE_FUNCTION_MAX_CALLS + if max_calls is None else max_calls) + + ray.signature.check_signature_supported(self._function) + self._function_signature = ray.signature.extract_signature( + self._function) + + # # Export the function. + worker = ray.worker.get_global_worker() + if worker.mode in [ray.worker.SCRIPT_MODE, ray.worker.SILENT_MODE]: + self._export() + elif worker.mode is None: + worker.cached_remote_functions_and_actors.append( + ("remote_function", self)) + + def __call__(self, *args, **kwargs): + raise Exception("Remote functions cannot be called directly. Instead " + "of running '{}()', try '{}.remote()'.".format( + self._function_name, self._function_name)) + + def remote(self, *args, **kwargs): + """This runs immediately when a remote function is called.""" + return self._submit(args=args, kwargs=kwargs) + + def _submit(self, + args=None, + kwargs=None, + num_return_vals=None, + num_cpus=None, + num_gpus=None, + resources=None): + """An experimental alternate way to submit remote functions.""" + worker = ray.worker.get_global_worker() + worker.check_connected() + ray.worker.check_main_thread() + kwargs = {} if kwargs is None else kwargs + args = ray.signature.extend_args(self._function_signature, args, + kwargs) + + if num_return_vals is None: + num_return_vals = self._num_return_vals + + resources = ray.utils.resources_from_resource_arguments( + self._num_cpus, self._num_gpus, self._resources, num_cpus, + num_gpus, resources) + if worker.mode == ray.worker.PYTHON_MODE: + # In PYTHON_MODE, remote calls simply execute the function. + # We copy the arguments to prevent the function call from + # mutating them and to match the usual behavior of + # immutable remote objects. + result = self._function(*copy.deepcopy(args)) + return result + object_ids = worker.submit_task( + ray.ObjectID(self._function_id), + args, + num_return_vals=num_return_vals, + resources=resources) + if len(object_ids) == 1: + return object_ids[0] + elif len(object_ids) > 1: + return object_ids + + def _export(self): + worker = ray.worker.get_global_worker() + worker.export_remote_function( + ray.ObjectID(self._function_id), self._function_name, + self._function, self._max_calls, self) diff --git a/python/ray/utils.py b/python/ray/utils.py index 0ef47daf971f..9fa3a4fe165f 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -3,7 +3,6 @@ from __future__ import print_function import binascii -import collections import hashlib import numpy as np import os @@ -125,7 +124,7 @@ def decode(byte_str): def binary_to_object_id(binary_object_id): - return ray.local_scheduler.ObjectID(binary_object_id) + return ray.ObjectID(binary_object_id) def binary_to_hex(identifier): @@ -139,11 +138,6 @@ def hex_to_binary(hex_identifier): return binascii.unhexlify(hex_identifier) -FunctionProperties = collections.namedtuple( - "FunctionProperties", ["num_return_vals", "resources", "max_calls"]) -"""FunctionProperties: A named tuple storing remote functions information.""" - - def get_cuda_visible_devices(): """Get the device IDs in the CUDA_VISIBLE_DEVICES environment variable. @@ -169,3 +163,48 @@ def set_cuda_visible_devices(gpu_ids): gpu_ids: This is a list of integers representing GPU IDs. """ os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in gpu_ids]) + + +def resources_from_resource_arguments(default_num_cpus, default_num_gpus, + default_resources, runtime_num_cpus, + runtime_num_gpus, runtime_resources): + """Determine a task's resource requirements. + + Args: + default_num_cpus: The default number of CPUs required by this function + or actor method. + default_num_gpus: The default number of GPUs required by this function + or actor method. + default_resources: The default custom resources required by this + function or actor method. + runtime_num_cpus: The number of CPUs requested when the task was + invoked. + runtime_num_gpus: The number of GPUs requested when the task was + invoked. + runtime_resources: The custom resources requested when the task was + invoked. + + Returns: + A dictionary of the resource requirements for the task. + """ + if runtime_resources is not None: + resources = runtime_resources.copy() + elif default_resources is not None: + resources = default_resources.copy() + else: + resources = {} + + if "CPU" in resources or "GPU" in resources: + raise ValueError("The resources dictionary must not " + "contain the key 'CPU' or 'GPU'") + + assert default_num_cpus is not None + resources["CPU"] = (default_num_cpus + if runtime_num_cpus is None else runtime_num_cpus) + + if runtime_num_gpus is not None: + resources["GPU"] = runtime_num_gpus + elif default_num_gpus is not None: + resources["GPU"] = default_num_gpus + + return resources diff --git a/python/ray/worker.py b/python/ray/worker.py index 39d4d93a6a4f..3b4495d25cfd 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -5,7 +5,6 @@ import atexit import collections import colorama -import copy import hashlib import inspect import json @@ -23,13 +22,13 @@ import pyarrow.plasma as plasma import ray.cloudpickle as pickle import ray.experimental.state as state +import ray.remote_function import ray.serialization as serialization import ray.services as services -import ray.signature as signature +import ray.signature import ray.local_scheduler import ray.plasma -from ray.utils import (FunctionProperties, random_string, binary_to_hex, - is_cython) +from ray.utils import random_string, binary_to_hex, is_cython # Import flatbuffer bindings. from ray.core.generated.ClientTableData import ClientTableData @@ -63,9 +62,6 @@ # This must be kept in sync with the `scheduling_state` enum in common/task.h. TASK_STATUS_RUNNING = 8 -# Default resource requirements for remote functions. -DEFAULT_REMOTE_FUNCTION_CPUS = 1 -DEFAULT_REMOTE_FUNCTION_GPUS = 0 # Default resource requirements for actors when no resource requirements are # specified. DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE = 1 @@ -74,15 +70,6 @@ # specified. DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE = 0 DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE = 1 -DEFAULT_ACTOR_CREATION_GPUS_SPECIFIED_CASE = 0 - - -class FunctionID(object): - def __init__(self, function_id): - self.function_id = function_id - - def id(self): - return self.function_id class RayTaskError(Exception): @@ -182,6 +169,11 @@ def __str__(self): self.task_error)) +FunctionExecutionInfo = collections.namedtuple( + "FunctionExecutionInfo", ["function", "function_name", "max_calls"]) +"""FunctionExecutionInfo: A named tuple storing remote function information.""" + + class Worker(object): """A class used to define the control flow of a worker process. @@ -190,9 +182,10 @@ class Worker(object): functions outside of this class are considered exposed. Attributes: - functions (Dict[str, Callable]): A dictionary mapping the name of a - remote function to the remote function itself. This is the set of - remote functions that can be executed by this worker. + function_execution_info (Dict[str, FunctionExecutionInfo]): A + dictionary mapping the name of a remote function to the remote + function itself. This is the set of remote functions that can be + executed by this worker. connected (bool): True if Ray has been started and False otherwise. mode: The mode of the worker. One of SCRIPT_MODE, PYTHON_MODE, SILENT_MODE, and WORKER_MODE. @@ -208,20 +201,12 @@ class Worker(object): def __init__(self): """Initialize a Worker object.""" - # The functions field is a dictionary that maps a driver ID to a - # dictionary of functions that have been registered for that driver - # (this inner dictionary maps function IDs to a tuple of the function - # name and the function itself). This should only be used on workers - # that execute remote functions. - self.functions = collections.defaultdict(lambda: {}) - # The function_properties field is a dictionary that maps a driver ID - # to a dictionary of functions that have been registered for that - # driver (this inner dictionary maps function IDs to a tuple of the - # number of values returned by that function, the number of CPUs - # required by that function, and the number of GPUs required by that - # function). This is used when submitting a function (which can be done - # both on workers and on drivers). - self.function_properties = collections.defaultdict(lambda: {}) + # This field is a dictionary that maps a driver ID to a dictionary of + # functions (and information about those functions) that have been + # registered for that driver (this inner dictionary maps function IDs + # to a FunctionExecutionInfo object. This should only be used on + # workers that execute remote functions. + self.function_execution_info = collections.defaultdict(lambda: {}) # This is a dictionary mapping driver ID to a dictionary that maps # remote function IDs for that driver to a counter of the number of # times that remote function has been executed on this worker. The @@ -248,6 +233,16 @@ def __init__(self): # CUDA_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray.utils.get_cuda_visible_devices() + def check_connected(self): + """Check if the worker is connected. + + Raises: + Exception: An exception is raised if the worker is not connected. + """ + if not self.connected: + raise RayConnectionError("Ray has not been started yet. You can " + "start Ray with 'ray.init()'.") + def set_mode(self, mode): """Set the mode of the worker. @@ -356,7 +351,7 @@ def put_object(self, object_id, value): full. """ # Make sure that the value is not an object ID. - if isinstance(value, ray.local_scheduler.ObjectID): + if isinstance(value, ray.ObjectID): raise Exception("Calling 'put' on an ObjectID is not allowed " "(similarly, returning an ObjectID from a remote " "function is not allowed). If you really want to " @@ -438,7 +433,7 @@ def get_object(self, object_ids): """ # Make sure that the values are object IDs. for object_id in object_ids: - if not isinstance(object_id, ray.local_scheduler.ObjectID): + if not isinstance(object_id, ray.ObjectID): raise Exception("Attempting to call `get` on the value {}, " "which is not an ObjectID.".format(object_id)) # Do an initial fetch for remote objects. We divide the fetch into @@ -518,8 +513,6 @@ def submit_task(self, actor_creation_dummy_object_id=None, execution_dependencies=None, num_return_vals=None, - num_cpus=None, - num_gpus=None, resources=None, driver_id=None): """Submit a remote task to the scheduler. @@ -545,8 +538,6 @@ def submit_task(self, execution_dependencies: The execution dependencies for this task. num_return_vals: The number of return values this function should have. - num_cpus: The number of CPUs required by this task. - num_gpus: The number of GPUs required by this task. resources: The resource requirements for this task. driver_id: The ID of the relevant driver. This is almost always the driver ID of the driver that is currently running. However, in @@ -561,24 +552,22 @@ def submit_task(self, check_main_thread() if actor_id is None: assert actor_handle_id is None - actor_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID) - actor_handle_id = ray.local_scheduler.ObjectID( - NIL_ACTOR_HANDLE_ID) + actor_id = ray.ObjectID(NIL_ACTOR_ID) + actor_handle_id = ray.ObjectID(NIL_ACTOR_HANDLE_ID) else: assert actor_handle_id is not None if actor_creation_id is None: - actor_creation_id = ray.local_scheduler.ObjectID(NIL_ACTOR_ID) + actor_creation_id = ray.ObjectID(NIL_ACTOR_ID) if actor_creation_dummy_object_id is None: - actor_creation_dummy_object_id = ( - ray.local_scheduler.ObjectID(NIL_ID)) + actor_creation_dummy_object_id = (ray.ObjectID(NIL_ID)) # Put large or complex arguments that are passed by value in the # object store first. args_for_local_scheduler = [] for arg in args: - if isinstance(arg, ray.local_scheduler.ObjectID): + if isinstance(arg, ray.ObjectID): args_for_local_scheduler.append(arg) elif ray.local_scheduler.check_simple_value(arg): args_for_local_scheduler.append(arg) @@ -592,26 +581,12 @@ def submit_task(self, if driver_id is None: driver_id = self.task_driver_id - # Look up the various function properties. - function_properties = self.function_properties[driver_id.id()][ - function_id.id()] - - if num_return_vals is None: - num_return_vals = function_properties.num_return_vals - - if resources is None and num_cpus is None and num_gpus is None: - resources = function_properties.resources - else: - resources = {} if resources is None else resources - if "CPU" in resources or "GPU" in resources: - raise ValueError("The resources dictionary must not " - "contain the keys 'CPU' or 'GPU'") - resources["CPU"] = num_cpus - resources["GPU"] = num_gpus + if resources is None: + raise ValueError("The resources dictionary is required.") # Submit the task to local scheduler. task = ray.local_scheduler.Task( - driver_id, ray.local_scheduler.ObjectID( + driver_id, ray.ObjectID( function_id.id()), args_for_local_scheduler, num_return_vals, self.current_task_id, self.task_index, actor_creation_id, actor_creation_dummy_object_id, actor_id, @@ -624,6 +599,55 @@ def submit_task(self, return task.returns() + def export_remote_function(self, function_id, function_name, function, + max_calls, decorated_function): + """Export a remote function. + + Args: + function_id: The ID of the function. + function_name: The name of the function. + function: The raw undecorated function to export. + max_calls: The maximum number of times a given worker can execute + this function before exiting. + decorated_function: The decorated function (this is used to enable + the remote function to recursively call itself). + """ + check_main_thread() + if self.mode not in [SCRIPT_MODE, SILENT_MODE]: + raise Exception("export_remote_function can only be called on a " + "driver.") + + key = (b"RemoteFunction:" + self.task_driver_id.id() + b":" + + function_id.id()) + + # Work around limitations of Python pickling. + function_name_global_valid = function.__name__ in function.__globals__ + function_name_global_value = function.__globals__.get( + function.__name__) + # Allow the function to reference itself as a global variable + if not is_cython(function): + function.__globals__[function.__name__] = decorated_function + try: + pickled_function = pickle.dumps(function) + finally: + # Undo our changes + if function_name_global_valid: + function.__globals__[function.__name__] = ( + function_name_global_value) + else: + del function.__globals__[function.__name__] + + self.redis_client.hmset( + key, { + "driver_id": self.task_driver_id.id(), + "function_id": function_id.id(), + "name": function_name, + "module": function.__module__, + "function": pickled_function, + "max_calls": max_calls + }) + self.redis_client.rpush("Exports", key) + def run_function_on_all_workers(self, function): """Run arbitrary code on all of the workers. @@ -697,7 +721,8 @@ def _wait_for_function(self, function_id, driver_id, timeout=10): while True: with self.lock: if (self.actor_id == NIL_ACTOR_ID - and (function_id.id() in self.functions[driver_id])): + and (function_id.id() in + self.function_execution_info[driver_id])): break elif self.actor_id != NIL_ACTOR_ID and ( self.actor_id in self.actors): @@ -741,7 +766,7 @@ def _get_arguments_for_execution(self, function_name, serialized_args): """ arguments = [] for (i, arg) in enumerate(serialized_args): - if isinstance(arg, ray.local_scheduler.ObjectID): + if isinstance(arg, ray.ObjectID): # get the object from the local object store argument = self.get_object([arg])[0] if isinstance(argument, RayTaskError): @@ -798,7 +823,6 @@ def _process_task(self, task): # message to the correct driver. self.task_driver_id = task.driver_id() self.current_task_id = task.task_id() - self.current_function_id = task.function_id().id() self.task_index = 0 self.put_index = 1 function_id = task.function_id() @@ -806,8 +830,10 @@ def _process_task(self, task): return_object_ids = task.returns() if task.actor_id().id() != NIL_ACTOR_ID: dummy_return_id = return_object_ids.pop() - function_name, function_executor = ( - self.functions[self.task_driver_id.id()][function_id.id()]) + function_executor = self.function_execution_info[ + self.task_driver_id.id()][function_id.id()].function + function_name = self.function_execution_info[self.task_driver_id.id()][ + function_id.id()].function_name # Get task arguments from the object store. try: @@ -829,7 +855,7 @@ def _process_task(self, task): try: with log_span("ray:task:execute", worker=self): if task.actor_id().id() == NIL_ACTOR_ID: - outputs = function_executor.executor(arguments) + outputs = function_executor(*arguments) else: outputs = function_executor( dummy_return_id, self.actors[task.actor_id().id()], @@ -862,8 +888,8 @@ def _process_task(self, task): def _handle_process_task_failure(self, function_id, return_object_ids, error, backtrace): - function_name, _ = self.functions[self.task_driver_id.id()][ - function_id.id()] + function_name = self.function_execution_info[self.task_driver_id.id()][ + function_id.id()].function_name failure_object = RayTaskError(function_name, error, backtrace) failure_objects = [ failure_object for _ in range(len(return_object_ids)) @@ -902,7 +928,7 @@ def _become_actor(self, task): time.sleep(0.001) with self.lock: - self.fetch_and_register_actor(key, task.required_resources(), self) + self.fetch_and_register_actor(key, self) def _wait_for_and_process_task(self, task): """Wait for a task to be ready and process the task. @@ -911,11 +937,11 @@ def _wait_for_and_process_task(self, task): task: The task to execute. """ function_id = task.function_id() + driver_id = task.driver_id().id() # TODO(rkn): It would be preferable for actor creation tasks to share # more of the code path with regular task execution. - if (task.actor_creation_id() != - ray.local_scheduler.ObjectID(NIL_ACTOR_ID)): + if (task.actor_creation_id() != ray.ObjectID(NIL_ACTOR_ID)): self._become_actor(task) return @@ -923,7 +949,7 @@ def _wait_for_and_process_task(self, task): # on this worker. We will push warnings to the user if we spend too # long in this loop. with log_span("ray:wait_for_function", worker=self): - self._wait_for_function(function_id, task.driver_id().id()) + self._wait_for_function(function_id, driver_id) # Execute the task. # TODO(rkn): Consider acquiring this lock with a timeout and pushing a @@ -934,8 +960,8 @@ def _wait_for_and_process_task(self, task): with self.lock: log(event_type="ray:acquire_lock", kind=LOG_SPAN_END, worker=self) - function_name, _ = ( - self.functions[task.driver_id().id()][function_id.id()]) + function_name = (self.function_execution_info[driver_id][ + function_id.id()]).function_name contents = { "function_name": function_name, "task_id": task.task_id().hex(), @@ -948,14 +974,13 @@ def _wait_for_and_process_task(self, task): flush_log() # Increase the task execution counter. - (self.num_task_executions[task.driver_id().id()][function_id.id()] - ) += 1 + self.num_task_executions[driver_id][function_id.id()] += 1 - reached_max_executions = (self.num_task_executions[task.driver_id().id( - )][function_id.id()] == self.function_properties[task.driver_id().id()] - [function_id.id()].max_calls) + reached_max_executions = ( + self.num_task_executions[driver_id][function_id.id()] == self. + function_execution_info[driver_id][function_id.id()].max_calls) if reached_max_executions: - ray.worker.global_worker.local_scheduler_client.disconnect() + self.local_scheduler_client.disconnect() os._exit(0) def _get_next_task_from_local_scheduler(self): @@ -1069,18 +1094,6 @@ def check_main_thread(): .format(threading.current_thread().getName())) -def check_connected(worker=global_worker): - """Check if the worker is connected. - - Raises: - Exception: An exception is raised if the worker is not connected. - """ - if not worker.connected: - raise RayConnectionError("This command cannot be called before Ray " - "has been started. You can start Ray with " - "'ray.init()'.") - - def print_failed_task(task_status): """Print information about failed tasks. @@ -1114,7 +1127,7 @@ def error_applies_to_driver(error_key, worker=global_worker): def error_info(worker=global_worker): """Return information about failed tasks.""" - check_connected(worker) + worker.check_connected() check_main_thread() error_keys = worker.redis_client.lrange("ErrorKeys", 0, -1) errors = [] @@ -1143,13 +1156,13 @@ def object_id_custom_serializer(obj): return obj.id() def object_id_custom_deserializer(serialized_obj): - return ray.local_scheduler.ObjectID(serialized_obj) + return ray.ObjectID(serialized_obj) # We register this serializer on each worker instead of calling # register_custom_serializer from the driver so that isinstance still # works. worker.serialization_context.register_type( - ray.local_scheduler.ObjectID, + ray.ObjectID, "ray.ObjectID", pickle=False, custom_serializer=object_id_custom_serializer, @@ -1786,12 +1799,9 @@ def fetch_and_register_remote_function(key, worker=global_worker): "driver_id", "function_id", "name", "function", "num_return_vals", "module", "resources", "max_calls" ]) - function_id = ray.local_scheduler.ObjectID(function_id_str) + function_id = ray.ObjectID(function_id_str) function_name = function_name.decode("ascii") - function_properties = FunctionProperties( - num_return_vals=int(num_return_vals), - resources=json.loads(resources.decode("ascii")), - max_calls=int(max_calls)) + max_calls = int(max_calls) module = module.decode("ascii") # This is a placeholder in case the function can't be unpickled. This will @@ -1799,11 +1809,9 @@ def fetch_and_register_remote_function(key, worker=global_worker): def f(): raise Exception("This function was not imported properly.") - remote_f_placeholder = remote(function_id=function_id)(lambda *xs: f()) - worker.functions[driver_id][function_id.id()] = (function_name, - remote_f_placeholder) - worker.function_properties[driver_id][function_id.id()] = ( - function_properties) + worker.function_execution_info[driver_id][function_id.id()] = ( + FunctionExecutionInfo( + function=f, function_name=function_name, max_calls=max_calls)) worker.num_task_executions[driver_id][function_id.id()] = 0 try: @@ -1825,8 +1833,11 @@ def f(): else: # TODO(rkn): Why is the below line necessary? function.__module__ = module - worker.functions[driver_id][function_id.id()] = ( - function_name, remote(function_id=function_id)(function)) + worker.function_execution_info[driver_id][function_id.id()] = ( + FunctionExecutionInfo( + function=function, + function_name=function_name, + max_calls=max_calls)) # Add the function to the function table. worker.redis_client.rpush(b"FunctionTable:" + function_id.id(), worker.worker_id) @@ -1973,6 +1984,14 @@ def connect(info, assert worker.cached_remote_functions_and_actors is not None, error_message # Initialize some fields. worker.worker_id = random_string() + + # When tasks are executed on remote workers in the context of multiple + # drivers, the task driver ID is used to keep track of which driver is + # responsible for the task so that error messages will be propagated to + # the correct driver. + if mode != WORKER_MODE: + worker.task_driver_id = ray.ObjectID(worker.worker_id) + # All workers start out as non-actors. A worker can be turned into an actor # after it is created. worker.actor_id = NIL_ACTOR_ID @@ -2102,13 +2121,7 @@ def connect(info, else: # Try to use true randomness. np.random.seed(None) - worker.current_task_id = ray.local_scheduler.ObjectID( - np.random.bytes(20)) - # When tasks are executed on remote workers in the context of multiple - # drivers, the task driver ID is used to keep track of which driver is - # responsible for the task so that error messages will be propagated to - # the correct driver. - worker.task_driver_id = ray.local_scheduler.ObjectID(worker.worker_id) + worker.current_task_id = ray.ObjectID(np.random.bytes(20)) # Reset the state of the numpy random number generator. np.random.set_state(numpy_state) # Set other fields needed for computing task IDs. @@ -2124,14 +2137,11 @@ def connect(info, nil_actor_counter = 0 driver_task = ray.local_scheduler.Task( - worker.task_driver_id, - ray.local_scheduler.ObjectID(NIL_FUNCTION_ID), [], 0, + worker.task_driver_id, ray.ObjectID(NIL_FUNCTION_ID), [], 0, worker.current_task_id, worker.task_index, - ray.local_scheduler.ObjectID(NIL_ACTOR_ID), - ray.local_scheduler.ObjectID(NIL_ACTOR_ID), - ray.local_scheduler.ObjectID(NIL_ACTOR_ID), - ray.local_scheduler.ObjectID(NIL_ACTOR_ID), nil_actor_counter, - False, [], {"CPU": 0}, worker.use_raylet) + ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), + ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), + nil_actor_counter, False, [], {"CPU": 0}, worker.use_raylet) global_state._execute_command( driver_task.task_id(), "RAY.TASK_TABLE_ADD", driver_task.task_id().id(), @@ -2194,11 +2204,7 @@ def connect(info, # Export cached remote functions to the workers. for cached_type, info in worker.cached_remote_functions_and_actors: if cached_type == "remote_function": - (function_id, func_name, func, func_invoker, - function_properties) = info - export_remote_function(function_id, func_name, func, - func_invoker, function_properties, - worker) + info._export() elif cached_type == "actor": (key, actor_class_info) = info ray.actor.publish_actor_class_to_key(key, actor_class_info, @@ -2450,7 +2456,7 @@ def get(object_ids, worker=global_worker): Returns: A Python object or a list of Python objects. """ - check_connected(worker) + worker.check_connected() with log_span("ray:get", worker=worker): check_main_thread() @@ -2483,7 +2489,7 @@ def put(value, worker=global_worker): Returns: The object ID assigned to this value. """ - check_connected(worker) + worker.check_connected() with log_span("ray:put", worker=worker): check_main_thread() @@ -2524,7 +2530,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): print("plasma_client.wait has not been implemented yet") return - if isinstance(object_ids, ray.local_scheduler.ObjectID): + if isinstance(object_ids, ray.ObjectID): raise TypeError( "wait() expected a list of ObjectID, got a single ObjectID") @@ -2534,12 +2540,12 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): if worker.mode != PYTHON_MODE: for object_id in object_ids: - if not isinstance(object_id, ray.local_scheduler.ObjectID): + if not isinstance(object_id, ray.ObjectID): raise TypeError("wait() expected a list of ObjectID, " "got list containing {}".format( type(object_id))) - check_connected(worker) + worker.check_connected() with log_span("ray:wait", worker=worker): check_main_thread() @@ -2561,27 +2567,14 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): ready_ids, remaining_ids = worker.plasma_client.wait( object_id_strs, timeout, num_returns) ready_ids = [ - ray.local_scheduler.ObjectID(object_id.binary()) - for object_id in ready_ids + ray.ObjectID(object_id.binary()) for object_id in ready_ids ] remaining_ids = [ - ray.local_scheduler.ObjectID(object_id.binary()) - for object_id in remaining_ids + ray.ObjectID(object_id.binary()) for object_id in remaining_ids ] return ready_ids, remaining_ids -def _submit_task(function_id, *args, **kwargs): - """This is a wrapper around worker.submit_task. - - We use this wrapper so that in the remote decorator, we can call - _submit_task instead of worker.submit_task. The difference is that when we - attempt to serialize remote functions, we don't attempt to serialize the - worker object, which cannot be serialized. - """ - return global_worker.submit_task(function_id, *args, **kwargs) - - def _mode(worker=global_worker): """This is a wrapper around worker.mode. @@ -2593,278 +2586,104 @@ def _mode(worker=global_worker): return worker.mode -def export_remote_function(function_id, - func_name, - func, - func_invoker, - function_properties, - worker=global_worker): - check_main_thread() - if _mode(worker) not in [SCRIPT_MODE, SILENT_MODE]: - raise Exception("export_remote_function can only be called on a " - "driver.") - - worker.function_properties[worker.task_driver_id.id()][ - function_id.id()] = function_properties - task_driver_id = worker.task_driver_id - key = b"RemoteFunction:" + task_driver_id.id() + b":" + function_id.id() - - # Work around limitations of Python pickling. - func_name_global_valid = func.__name__ in func.__globals__ - func_name_global_value = func.__globals__.get(func.__name__) - # Allow the function to reference itself as a global variable - if not is_cython(func): - func.__globals__[func.__name__] = func_invoker - try: - pickled_func = pickle.dumps(func) - finally: - # Undo our changes - if func_name_global_valid: - func.__globals__[func.__name__] = func_name_global_value - else: - del func.__globals__[func.__name__] - - worker.redis_client.hmset( - key, { - "driver_id": worker.task_driver_id.id(), - "function_id": function_id.id(), - "name": func_name, - "module": func.__module__, - "function": pickled_func, - "num_return_vals": function_properties.num_return_vals, - "resources": json.dumps(function_properties.resources), - "max_calls": function_properties.max_calls - }) - worker.redis_client.rpush("Exports", key) - - -def in_ipython(): - """Return true if we are in an IPython interpreter and false otherwise.""" - try: - __IPYTHON__ - return True - except NameError: - return False - +def get_global_worker(): + return global_worker + + +def make_decorator(num_return_vals=None, + num_cpus=None, + num_gpus=None, + resources=None, + max_calls=None, + checkpoint_interval=None, + worker=None): + def decorator(function_or_class): + if (inspect.isfunction(function_or_class) + or is_cython(function_or_class)): + # Set the remote function default resources. + if checkpoint_interval is not None: + raise Exception("The keyword 'checkpoint_interval' is not " + "allowed for remote functions.") + + return ray.remote_function.RemoteFunction( + function_or_class, num_cpus, num_gpus, resources, + num_return_vals, max_calls) + + if inspect.isclass(function_or_class): + if num_return_vals is not None: + raise Exception("The keyword 'num_return_vals' is not allowed " + "for actors.") + if max_calls is not None: + raise Exception("The keyword 'max_calls' is not allowed for " + "actors.") + + # Set the actor default resources. + if num_cpus is None and num_gpus is None and resources is None: + # In the default case, actors acquire no resources for + # their lifetime, and actor methods will require 1 CPU. + cpus_to_use = DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE + actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE + else: + # If any resources are specified, then all resources are + # acquired for the actor's lifetime and no resources are + # associated with methods. + cpus_to_use = (DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE + if num_cpus is None else num_cpus) + actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE -def compute_function_id(func_name, func): - """Compute an function ID for a function. + return worker.make_actor(function_or_class, cpus_to_use, num_gpus, + resources, actor_method_cpus, + checkpoint_interval) - Args: - func_name: The name of the function (this includes the module name plus - the function name). - func: The actual function. + raise Exception("The @ray.remote decorator must be applied to " + "either a function or to a class.") - Returns: - This returns the function ID. - """ - function_id_hash = hashlib.sha1() - # Include the function name in the hash. - function_id_hash.update(func_name.encode("ascii")) - # If we are running a script or are in IPython, include the source code in - # the hash. If we are in a regular Python interpreter we skip this part - # because the source code is not accessible. If the function is a built-in - # (e.g., Cython), the source code is not accessible. - import __main__ as main - if (hasattr(main, "__file__") or in_ipython()) \ - and inspect.isfunction(func): - function_id_hash.update(inspect.getsource(func).encode("ascii")) - # Compute the function ID. - function_id = function_id_hash.digest() - assert len(function_id) == 20 - function_id = FunctionID(function_id) - - return function_id + return decorator def remote(*args, **kwargs): - """This decorator is used to define remote functions and to define actors. + worker = get_global_worker() - Args: - num_return_vals (int): The number of object IDs that a call to this - function should return. - num_cpus (int): The number of CPUs needed to execute this function. - num_gpus (int): The number of GPUs needed to execute this function. - resources: A dictionary mapping resource name to the required quantity - of that resource. - max_calls (int): The maximum number of tasks of this kind that can be - run on a worker before the worker needs to be restarted. - checkpoint_interval (int): The number of tasks to run between - checkpoints of the actor state. - """ - worker = global_worker - - def make_remote_decorator(num_return_vals, - num_cpus, - num_gpus, - resources, - max_calls, - checkpoint_interval, - func_id=None): - def remote_decorator(func_or_class): - if inspect.isfunction(func_or_class) or is_cython(func_or_class): - # Set the remote function default resources. - resources["CPU"] = (DEFAULT_REMOTE_FUNCTION_CPUS - if num_cpus is None else num_cpus) - resources["GPU"] = (DEFAULT_REMOTE_FUNCTION_GPUS - if num_gpus is None else num_gpus) - - function_properties = FunctionProperties( - num_return_vals=num_return_vals, - resources=resources, - max_calls=max_calls) - return remote_function_decorator(func_or_class, - function_properties) - if inspect.isclass(func_or_class): - # Set the actor default resources. - if num_cpus is None and num_gpus is None and resources == {}: - # In the default case, actors acquire no resources for - # their lifetime, and actor methods will require 1 CPU. - resources["CPU"] = DEFAULT_ACTOR_CREATION_CPUS_SIMPLE_CASE - actor_method_cpus = DEFAULT_ACTOR_METHOD_CPUS_SIMPLE_CASE - else: - # If any resources are specified, then all resources are - # acquired for the actor's lifetime and no resources are - # associated with methods. - resources["CPU"] = ( - DEFAULT_ACTOR_CREATION_CPUS_SPECIFIED_CASE - if num_cpus is None else num_cpus) - resources["GPU"] = ( - DEFAULT_ACTOR_CREATION_GPUS_SPECIFIED_CASE - if num_gpus is None else num_gpus) - actor_method_cpus = ( - DEFAULT_ACTOR_METHOD_CPUS_SPECIFIED_CASE) - - return worker.make_actor(func_or_class, resources, - checkpoint_interval, - actor_method_cpus) - raise Exception("The @ray.remote decorator must be applied to " - "either a function or to a class.") - - def remote_function_decorator(func, function_properties): - func_name = "{}.{}".format(func.__module__, func.__name__) - if func_id is None: - function_id = compute_function_id(func_name, func) - else: - function_id = func_id - - def func_call(*args, **kwargs): - """This runs immediately when a remote function is called.""" - return _submit(args=args, kwargs=kwargs) - - def _submit(args=None, - kwargs=None, - num_return_vals=None, - num_cpus=None, - num_gpus=None, - resources=None): - """An experimental alternate way to submit remote functions.""" - check_connected() - check_main_thread() - kwargs = {} if kwargs is None else kwargs - args = signature.extend_args(function_signature, args, kwargs) - - if _mode() == PYTHON_MODE: - # In PYTHON_MODE, remote calls simply execute the function. - # We copy the arguments to prevent the function call from - # mutating them and to match the usual behavior of - # immutable remote objects. - result = func(*copy.deepcopy(args)) - return result - object_ids = _submit_task( - function_id, - args, - num_return_vals=num_return_vals, - num_cpus=num_cpus, - num_gpus=num_gpus, - resources=resources) - if len(object_ids) == 1: - return object_ids[0] - elif len(object_ids) > 1: - return object_ids - - def func_executor(arguments): - """This gets run when the remote function is executed.""" - result = func(*arguments) - return result - - def func_invoker(*args, **kwargs): - """This is used to invoke the function.""" - raise Exception("Remote functions cannot be called directly. " - "Instead of running '{}()', try '{}.remote()'." - .format(func_name, func_name)) - - func_invoker.remote = func_call - func_invoker._submit = _submit - func_invoker.executor = func_executor - func_invoker.is_remote = True - func_name = "{}.{}".format(func.__module__, func.__name__) - func_invoker.func_name = func_name - if sys.version_info >= (3, 0) or is_cython(func): - func_invoker.__doc__ = func.__doc__ - else: - func_invoker.func_doc = func.func_doc - - signature.check_signature_supported(func) - function_signature = signature.extract_signature(func) - - # Everything ready - export the function - if worker.mode in [SCRIPT_MODE, SILENT_MODE]: - export_remote_function(function_id, func_name, func, - func_invoker, function_properties) - elif worker.mode is None: - worker.cached_remote_functions_and_actors.append( - ("remote_function", (function_id, func_name, func, - func_invoker, function_properties))) - return func_invoker - - return remote_decorator + if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): + # This is the case where the decorator is just @ray.remote. + return make_decorator(worker=worker)(args[0]) + + # Parse the keyword arguments from the decorator. + error_string = ("The @ray.remote decorator must be applied either " + "with no arguments and no parentheses, for example " + "'@ray.remote', or it must be applied using some of " + "the arguments 'num_return_vals', 'num_cpus', 'num_gpus', " + "'resources', 'max_calls', or 'checkpoint_interval', like " + "'@ray.remote(num_return_vals=2, " + "resources={\"CustomResource\": 1})'.") + assert len(args) == 0 and len(kwargs) > 0, error_string + for key in kwargs: + assert key in [ + "num_return_vals", "num_cpus", "num_gpus", "resources", + "max_calls", "checkpoint_interval" + ], error_string - # Handle resource arguments num_cpus = kwargs["num_cpus"] if "num_cpus" in kwargs else None num_gpus = kwargs["num_gpus"] if "num_gpus" in kwargs else None - resources = kwargs.get("resources", {}) - if not isinstance(resources, dict): + resources = kwargs.get("resources") + if not isinstance(resources, dict) and resources is not None: raise Exception("The 'resources' keyword argument must be a " "dictionary, but received type {}.".format( type(resources))) - assert "CPU" not in resources, "Use the 'num_cpus' argument." - assert "GPU" not in resources, "Use the 'num_gpus' argument." + if resources is not None: + assert "CPU" not in resources, "Use the 'num_cpus' argument." + assert "GPU" not in resources, "Use the 'num_gpus' argument." + # Handle other arguments. - num_return_vals = (kwargs["num_return_vals"] - if "num_return_vals" in kwargs else 1) - max_calls = kwargs["max_calls"] if "max_calls" in kwargs else 0 - checkpoint_interval = (kwargs["checkpoint_interval"] - if "checkpoint_interval" in kwargs else -1) - - if _mode() == WORKER_MODE: - if "function_id" in kwargs: - function_id = kwargs["function_id"] - return make_remote_decorator(num_return_vals, num_cpus, num_gpus, - resources, max_calls, - checkpoint_interval, function_id) + num_return_vals = kwargs.get("num_return_vals") + max_calls = kwargs.get("max_calls") + checkpoint_interval = kwargs.get("checkpoint_interval") - if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): - # This is the case where the decorator is just @ray.remote. - return make_remote_decorator(num_return_vals, num_cpus, num_gpus, - resources, max_calls, - checkpoint_interval)(args[0]) - else: - # This is the case where the decorator is something like - # @ray.remote(num_return_vals=2). - error_string = ("The @ray.remote decorator must be applied either " - "with no arguments and no parentheses, for example " - "'@ray.remote', or it must be applied using some of " - "the arguments 'num_return_vals', 'resources', " - "or 'max_calls', like " - "'@ray.remote(num_return_vals=2, " - "resources={\"GPU\": 1})'.") - assert len(args) == 0 and len(kwargs) > 0, error_string - for key in kwargs: - assert key in [ - "num_return_vals", "num_cpus", "num_gpus", "resources", - "max_calls", "checkpoint_interval" - ], error_string - assert "function_id" not in kwargs - return make_remote_decorator(num_return_vals, num_cpus, num_gpus, - resources, max_calls, checkpoint_interval) + return make_decorator( + num_return_vals=num_return_vals, + num_cpus=num_cpus, + num_gpus=num_gpus, + resources=resources, + max_calls=max_calls, + checkpoint_interval=checkpoint_interval, + worker=worker) diff --git a/test/runtest.py b/test/runtest.py index 9a465188f2b6..ab24f5ea1256 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1724,7 +1724,7 @@ def f(): os.environ.get("RAY_USE_XRAY") == "1", "This test does not work with xray yet.") def testBlockingTasks(self): - ray.init(num_workers=1) + ray.init(num_cpus=1) @ray.remote def f(i, j): @@ -1734,20 +1734,20 @@ def f(i, j): def g(i): # Each instance of g submits and blocks on the result of another # remote task. - object_ids = [f.remote(i, j) for j in range(10)] + object_ids = [f.remote(i, j) for j in range(2)] return ray.get(object_ids) - ray.get([g.remote(i) for i in range(100)]) + ray.get([g.remote(i) for i in range(4)]) @ray.remote def _sleep(i): - time.sleep(1) + time.sleep(0.01) return (i) @ray.remote def sleep(): # Each instance of sleep submits and blocks on the result of - # another remote task, which takes one second to execute. + # another remote task, which takes some time to execute. ray.get([_sleep.remote(i) for i in range(10)]) ray.get(sleep.remote())