diff --git a/python/ray/actor.py b/python/ray/actor.py index d5e48a12cdab..f0fc2426e744 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -2,6 +2,7 @@ from __future__ import division from __future__ import print_function +import cloudpickle as pickle import hashlib import inspect import json @@ -10,7 +11,6 @@ import traceback import ray.local_scheduler -import ray.pickling as pickling import ray.signature as signature import ray.worker from ray.utils import random_string, binary_to_hex, hex_to_binary @@ -72,7 +72,7 @@ def temporary_actor_method(*xs): temporary_actor_method) try: - unpickled_class = pickling.loads(pickled_class) + unpickled_class = pickle.loads(pickled_class) except Exception: # If an exception was thrown when the actor was imported, we record the # traceback and notify the scheduler of the failure. @@ -207,7 +207,7 @@ def export_actor_class(class_id, Class, actor_method_names, worker): d = {"driver_id": worker.task_driver_id.id(), "class_name": Class.__name__, "module": Class.__module__, - "class": pickling.dumps(Class), + "class": pickle.dumps(Class), "actor_method_names": json.dumps(list(actor_method_names))} worker.redis_client.hmset(key, d) worker.redis_client.rpush("Exports", key) diff --git a/python/ray/pickling.py b/python/ray/pickling.py deleted file mode 100644 index 9e6d68b68f89..000000000000 --- a/python/ray/pickling.py +++ /dev/null @@ -1,87 +0,0 @@ -# Note that a little bit of code here is taken and slightly modified from the -# pickler because it was not possible to change its behavior otherwise. - -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -from ctypes import c_void_p -from cloudpickle import pickle, cloudpickle, CloudPickler, load, loads - -__all__ = ["load", "loads", "dump", "dumps"] - -try: - from ctypes import pythonapi - pythonapi.PyCell_Set # Make sure this exists -except: - pythonapi = None - - -def dump(obj, file, protocol=2): - return BetterPickler(file, protocol).dump(obj) - - -def dumps(obj): - stringio = cloudpickle.StringIO() - dump(obj, stringio) - return stringio.getvalue() - - -def _make_skel_func(code, closure, base_globals=None): - """Create a skeleton function object. - - Creates a skeleton function object that contains just the provided code and - the correct number of cells in func_closure. All other func attributes - (e.g. func_globals) are empty. - """ - if base_globals is None: - base_globals = {} - base_globals["__builtins__"] = __builtins__ - return _make_skel_func.__class__(code, base_globals, None, None, - tuple(closure)) - - -def _fill_function(func, globals, defaults, closure, dict): - """Fill in the resst of the function data. - - This fills in the rest of function data into the skeleton function object - that were created via _make_skel_func(), including closures. - """ - result = cloudpickle._fill_function(func, globals, defaults, dict) - if pythonapi is not None: - for i, v in enumerate(closure): - pythonapi.PyCell_Set(c_void_p(id(result.__closure__[i])), - c_void_p(id(v))) - return result - - -class BetterPickler(CloudPickler): - def save_function_tuple(self, func): - (code, f_globals, defaults, - closure, dct, base_globals) = self.extract_func_data(func) - - self.save(_fill_function) - self.write(pickle.MARK) - - self.save(_make_skel_func if pythonapi else cloudpickle._make_skel_func) - self.save((code, - (map(lambda _: cloudpickle._make_cell(None), closure) - if closure and pythonapi is not None - else closure), - base_globals)) - self.write(pickle.REDUCE) - self.memoize(func) - - self.save(f_globals) - self.save(defaults) - self.save(closure) - self.save(dct) - self.write(pickle.TUPLE) - self.write(pickle.REDUCE) - - def save_cell(self, obj): - self.save(cloudpickle._make_cell) - self.save((obj.cell_contents,)) - self.write(pickle.REDUCE) - dispatch = CloudPickler.dispatch.copy() - dispatch[(lambda _: lambda: _)(0).__closure__[0].__class__] = save_cell diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 43c5eebdfd8f..bd66a83ba636 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -2,8 +2,9 @@ from __future__ import division from __future__ import print_function +import cloudpickle as pickle + import ray.numbuf -import ray.pickling as pickling class RaySerializationException(Exception): @@ -123,7 +124,7 @@ def serialize(obj): class_id = type_to_class_id[type(obj)] if class_id in classes_to_pickle: - serialized_obj = {"data": pickling.dumps(obj), + serialized_obj = {"data": pickle.dumps(obj), "pickle": True} elif class_id in custom_serializers: serialized_obj = {"data": custom_serializers[class_id](obj)} @@ -160,7 +161,7 @@ def deserialize(serialized_obj): if "pickle" in serialized_obj: # The object was pickled, so unpickle it. - obj = pickling.loads(serialized_obj["data"]) + obj = pickle.loads(serialized_obj["data"]) else: assert class_id not in classes_to_pickle if class_id not in whitelisted_classes: diff --git a/python/ray/worker.py b/python/ray/worker.py index dbedd4e3d721..b819208d5674 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -3,6 +3,7 @@ from __future__ import print_function import atexit +import cloudpickle as pickle import collections import colorama import copy @@ -20,7 +21,6 @@ # Ray modules import ray.experimental.state as state -import ray.pickling as pickling import ray.serialization as serialization import ray.services as services import ray.signature as signature @@ -497,7 +497,7 @@ def run_function_on_all_workers(self, function): # Attempt to pickle the function before we need it. This could fail, and # it is more convenient if the failure happens before we actually run the # function locally. - pickled_function = pickling.dumps(function) + pickled_function = pickle.dumps(function) function_to_run_id = random_string() key = "FunctionsToRun:{}".format(function_to_run_id) @@ -1087,7 +1087,7 @@ def f(): num_gpus) try: - function = pickling.loads(serialized_function) + function = pickle.loads(serialized_function) except: # If an exception was thrown when the remote function was imported, we # record the traceback and notify the scheduler of the failure. @@ -1117,7 +1117,7 @@ def fetch_and_execute_function_to_run(key, worker=global_worker): counter = worker.redis_client.hincrby(worker.node_ip_address, key, 1) - 1 try: # Deserialize the function. - function = pickling.loads(serialized_function) + function = pickle.loads(serialized_function) # Run the function. function({"counter": counter}) except: @@ -1843,7 +1843,7 @@ def export_remote_function(function_id, func_name, func, func_invoker, # Allow the function to reference itself as a global variable func.__globals__[func.__name__] = func_invoker try: - pickled_func = pickling.dumps(func) + pickled_func = pickle.dumps(func) finally: # Undo our changes if func_name_global_valid: