diff --git a/doc/source/internals-overview.rst b/doc/source/internals-overview.rst index 7a762b6bdff0..109b923ecc3b 100644 --- a/doc/source/internals-overview.rst +++ b/doc/source/internals-overview.rst @@ -66,32 +66,6 @@ listens for the addition of remote functions to the centralized control state. When a new remote function is added, the thread fetches the pickled remote function, unpickles it, and can then execute that function. -Notes and limitations -~~~~~~~~~~~~~~~~~~~~~ - -- Because we export remote functions as soon as they are defined, that means - that remote functions can't close over variables that are defined after the - remote function is defined. For example, the following code gives an error. - - .. code-block:: python - - @ray.remote - def f(x): - return helper(x) - - def helper(x): - return x + 1 - - If you call ``f.remote(0)``, it will give an error of the form. - - .. code-block:: python - - Traceback (most recent call last): - File "", line 3, in f - NameError: name 'helper' is not defined - - On the other hand, if ``helper`` is defined before ``f``, then it will work. - Calling a remote function ------------------------- diff --git a/python/ray/actor.py b/python/ray/actor.py index 420fe5c3a58e..dce9a0b26074 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -186,9 +186,12 @@ class ActorClass(object): task. _resources: The default resources required by the actor creation task. _actor_method_cpus: The number of CPUs required by actor method tasks. - _last_export_session: The index of the last session in which the remote - function was exported. This is used to determine if we need to - export the remote function again. + _last_driver_id_exported_for: The ID of the driver ID of the last Ray + session during which this actor class definition was exported. This + is an imperfect mechanism used to determine if we need to export + the remote function again. It is imperfect in the sense that the + actor class definition could be exported multiple times by + different workers. _actor_methods: The actor methods. _method_decorators: Optional decorators that should be applied to the method invocation function before invoking the actor methods. These @@ -209,7 +212,7 @@ def __init__(self, modified_class, class_id, max_reconstructions, num_cpus, self._num_cpus = num_cpus self._num_gpus = num_gpus self._resources = resources - self._last_export_session = None + self._last_driver_id_exported_for = None self._actor_methods = inspect.getmembers( self._modified_class, ray.utils.is_function_or_method) @@ -342,12 +345,13 @@ def _remote(self, *copy.deepcopy(args), **copy.deepcopy(kwargs)) else: # Export the actor. - if (self._last_export_session is None - or self._last_export_session < worker._session_index): + if (self._last_driver_id_exported_for is None + or self._last_driver_id_exported_for != + worker.task_driver_id): # If this actor class was exported in a previous session, we # need to export this function again, because current GCS # doesn't have it. - self._last_export_session = worker._session_index + self._last_driver_id_exported_for = worker.task_driver_id worker.function_actor_manager.export_actor_class( self._modified_class, self._actor_method_names) diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index e4a172fc1e71..4914c9f87050 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -342,7 +342,7 @@ def export(self, remote_function): # and export it later. self._functions_to_export.append(remote_function) return - if self._worker.mode != ray.worker.SCRIPT_MODE: + if self._worker.mode == ray.worker.LOCAL_MODE: # Don't need to export if the worker is not a driver. return self._do_export(remote_function) diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index e4828fd47bb5..44d2777a2900 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -43,9 +43,12 @@ class RemoteFunction(object): return the resulting ObjectIDs. For an example, see "test_decorated_function" in "python/ray/tests/test_basic.py". _function_signature: The function signature. - _last_export_session: The index of the last session in which the remote - function was exported. This is used to determine if we need to - export the remote function again. + _last_driver_id_exported_for: The ID of the driver ID of the last Ray + session during which this remote function definition was exported. + This is an imperfect mechanism used to determine if we need to + export the remote function again. It is imperfect in the sense that + the actor class definition could be exported multiple times by + different workers. """ def __init__(self, function, num_cpus, num_gpus, resources, @@ -69,10 +72,7 @@ def __init__(self, function, num_cpus, num_gpus, resources, self._function_signature = ray.signature.extract_signature( self._function) - # Export the function. - worker = ray.worker.get_global_worker() - self._last_export_session = worker._session_index - worker.function_actor_manager.export(self) + self._last_driver_id_exported_for = None def __call__(self, *args, **kwargs): raise Exception("Remote functions cannot be called directly. Instead " @@ -111,10 +111,11 @@ def _remote(self, worker = ray.worker.get_global_worker() worker.check_connected() - if self._last_export_session < worker._session_index: + if (self._last_driver_id_exported_for is None + or self._last_driver_id_exported_for != worker.task_driver_id): # If this function was exported in a previous session, we need to # export this function again, because current GCS doesn't have it. - self._last_export_session = worker._session_index + self._last_driver_id_exported_for = worker.task_driver_id worker.function_actor_manager.export(self) kwargs = {} if kwargs is None else kwargs diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 056aedd4f86c..d6eebe1517bb 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -303,6 +303,23 @@ def f(x): assert_equal(obj, ray.get(ray.put(obj))) +def test_nested_functions(ray_start_regular): + # Make sure that remote functions can use other values that are defined + # after the remote function but before the first function invocation. + @ray.remote + def f(): + return g(), ray.get(h.remote()) + + def g(): + return 1 + + @ray.remote + def h(): + return 2 + + assert ray.get(f.remote()) == (1, 2) + + def test_ray_recursive_objects(ray_start_regular): class ClassA(object): pass @@ -2968,3 +2985,17 @@ def method(self): ray.get(f.remote()) a = Actor.remote() ray.get(a.method.remote()) + + ray.shutdown() + + # Start Ray again and make sure that these definitions can be exported from + # workers. + ray.init(num_cpus=2) + + @ray.remote + def export_definitions_from_worker(remote_function, actor_class): + ray.get(remote_function.remote()) + actor_handle = actor_class.remote() + ray.get(actor_handle.method.remote()) + + ray.get(export_definitions_from_worker.remote(f, Actor)) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 650cce68b246..6a782ee726da 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -95,7 +95,15 @@ def temporary_helper_function(): # fail when it is unpickled. @ray.remote def g(): - return module.temporary_python_file() + try: + module.temporary_python_file() + except Exception: + # This test is not concerned with the error from running this + # function. Only from unpickling the remote function. + pass + + # Invoke the function so that the definition is exported. + g.remote() wait_for_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR, 2) errors = relevant_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR) @@ -499,6 +507,9 @@ def test_export_large_objects(ray_start_regular): def f(): large_object + # Invoke the function so that the definition is exported. + f.remote() + # Make sure that a warning is generated. wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 1) diff --git a/python/ray/tests/test_monitors.py b/python/ray/tests/test_monitors.py index d588732c11f4..36ed55a52474 100644 --- a/python/ray/tests/test_monitors.py +++ b/python/ray/tests/test_monitors.py @@ -46,13 +46,6 @@ def Driver(success): # Two new objects. ray.get(ray.put(1111)) ray.get(ray.put(1111)) - attempts = 0 - while (2, 1, summary_start[2]) != StateSummary(): - time.sleep(0.1) - attempts += 1 - if attempts == max_attempts_before_failing: - success.value = False - break @ray.remote def f(): @@ -61,7 +54,7 @@ def f(): # 1 new function. attempts = 0 - while (2, 1, summary_start[2] + 1) != StateSummary(): + while (2, 1, summary_start[2]) != StateSummary(): time.sleep(0.1) attempts += 1 if attempts == max_attempts_before_failing: