Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/cloudpickle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

from ray.cloudpickle.cloudpickle import *

__version__ = '0.8.0.dev0'
__version__ = '0.9.0.dev0'
123 changes: 88 additions & 35 deletions python/ray/cloudpickle/cloudpickle.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
"""
This class is defined to override standard pickle functionality

The goals of it follow:
-Serialize lambdas and nested functions to compiled byte code
-Deal with main module correctly
-Deal with other non-serializable objects

It does not include an unpickler, as standard python unpickling suffices.

This module was extracted from the `cloud` package, developed by `PiCloud, Inc.
<https://web.archive.org/web/20140626004012/http://www.picloud.com/>`_.

Copyright (c) 2012, Regents of the University of California.
Copyright (c) 2009 `PiCloud, Inc. <https://web.archive.org/web/20140626004012/http://www.picloud.com/>`_.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
Expand All @@ -27,7 +22,6 @@
names of its contributors may be used to endorse or promote
products derived from this software without specific prior written
permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
Expand All @@ -44,7 +38,6 @@

import dis
from functools import partial
import importlib
import io
import itertools
import logging
Expand All @@ -56,12 +49,26 @@
import traceback
import types
import weakref
import uuid
import threading


try:
from enum import Enum
except ImportError:
Enum = None

# cloudpickle is meant for inter process communication: we expect all
# communicating processes to run the same Python version hence we favor
# communication speed over compatibility:
DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL

# Track the provenance of reconstructed dynamic classes to make it possible to
# recontruct instances from the matching singleton class definition when
# appropriate and preserve the usual "isinstance" semantics of Python objects.
_DYNAMIC_CLASS_TRACKER_BY_CLASS = weakref.WeakKeyDictionary()
_DYNAMIC_CLASS_TRACKER_BY_ID = weakref.WeakValueDictionary()
_DYNAMIC_CLASS_TRACKER_LOCK = threading.Lock()

if sys.version_info[0] < 3: # pragma: no branch
from pickle import Pickler
Expand All @@ -79,26 +86,38 @@
PY3 = True


def _ensure_tracking(class_def):
with _DYNAMIC_CLASS_TRACKER_LOCK:
class_tracker_id = _DYNAMIC_CLASS_TRACKER_BY_CLASS.get(class_def)
if class_tracker_id is None:
class_tracker_id = uuid.uuid4().hex
_DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
_DYNAMIC_CLASS_TRACKER_BY_ID[class_tracker_id] = class_def
return class_tracker_id


def _lookup_class_or_track(class_tracker_id, class_def):
if class_tracker_id is not None:
with _DYNAMIC_CLASS_TRACKER_LOCK:
class_def = _DYNAMIC_CLASS_TRACKER_BY_ID.setdefault(
class_tracker_id, class_def)
_DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
return class_def


def _make_cell_set_template_code():
"""Get the Python compiler to emit LOAD_FAST(arg); STORE_DEREF

Notes
-----
In Python 3, we could use an easier function:

.. code-block:: python

def f():
cell = None

def _stub(value):
nonlocal cell
cell = value

return _stub

_cell_set_template_code = f().__code__

This function is _only_ a LOAD_FAST(arg); STORE_DEREF, but that is
invalid syntax on Python 2. If we use this function we also don't need
to do the weird freevars/cellvars swap below
Expand Down Expand Up @@ -318,7 +337,6 @@ def save_codeobject(self, obj):

def save_function(self, obj, name=None):
""" Registered with the dispatch to handle all function types.

Determines what kind of function obj is (e.g. lambda, defined at
interactive prompt, etc) and handles the pickling appropriately.
"""
Expand Down Expand Up @@ -416,26 +434,18 @@ def save_function(self, obj, name=None):
def _save_subimports(self, code, top_level_dependencies):
"""
Save submodules used by a function but not listed in its globals.

In the example below:

```
import concurrent.futures
import cloudpickle


def func():
x = concurrent.futures.ThreadPoolExecutor


if __name__ == '__main__':
cloudpickle.dumps(func)
```

the globals extracted by cloudpickle in the function's state include
the concurrent module, but not its submodule (here,
concurrent.futures), which is the module used by func.

To ensure that calling the depickled function does not raise an
AttributeError, this function looks for any currently loaded submodule
that the function uses and whose parent is present in the function
Expand All @@ -460,14 +470,42 @@ def func():
# then discards the reference to it
self.write(pickle.POP)

def _save_dynamic_enum(self, obj):
"""Special handling for dynamic Enum subclasses
Use the Enum functional API (inherited from EnumMeta.__call__) as the
EnumMeta metaclass has complex initialization that makes the Enum
subclasses hold references to their own instances.
"""
class_tracker_id = _ensure_tracking(obj)

# XXX: shall we pass type and start kwargs? If so how to retrieve the
# correct info from obj.
elements = dict((e.name, e.value) for e in obj)

if obj.__doc__ is not obj.__base__.__doc__:
doc = obj.__doc__
else:
doc = None

extra = {}
if hasattr(obj, "__qualname__"):
extra["qualname"] = obj.__qualname__

self.save_reduce(_make_dynamic_enum,
(obj.__base__, obj.__name__, elements, doc,
obj.__module__, class_tracker_id, extra),
obj=obj)

def save_dynamic_class(self, obj):
"""
Save a class that can't be stored as module global.

This method is used to serialize classes that are defined inside
functions, or that otherwise can't be serialized as attribute lookups
from global modules.
"""
if Enum is not None and issubclass(obj, Enum):
return self._save_dynamic_enum(obj)

clsdict = dict(obj.__dict__) # copy dict proxy to a dict
clsdict.pop('__weakref__', None)

Expand Down Expand Up @@ -525,7 +563,10 @@ def save_dynamic_class(self, obj):

# Create and memoize an skeleton class with obj's name and bases.
tp = type(obj)
self.save_reduce(tp, (obj.__name__, obj.__bases__, type_kwargs), obj=obj)
self.save_reduce(_make_skeleton_class,
(tp, obj.__name__, obj.__bases__, type_kwargs,
_ensure_tracking(obj), {}),
obj=obj)

# Now save the rest of obj's __dict__. Any references to obj
# encountered while saving will point to the skeleton class.
Expand All @@ -539,7 +580,6 @@ def save_dynamic_class(self, obj):

def save_function_tuple(self, func):
""" Pickles an actual func object.

A func comprises: code, globals, defaults, closure, and dict. We
extract and save these, injecting reducing functions at certain points
to recreate the func object. Keep in mind that some of these pieces
Expand Down Expand Up @@ -678,7 +718,6 @@ def save_builtin_function(self, obj):
def save_global(self, obj, name=None, pack=struct.pack):
"""
Save a "global".

The name of this method is somewhat misleading: all types get
dispatched here.
"""
Expand Down Expand Up @@ -925,11 +964,9 @@ def _rebuild_tornado_coroutine(func):

def dump(obj, file, protocol=None):
"""Serialize obj as bytes streamed into file

protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed
between processes running the same Python version.

Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure
compatibility with older versions of Python.
"""
Expand All @@ -938,11 +975,9 @@ def dump(obj, file, protocol=None):

def dumps(obj, protocol=None):
"""Serialize obj as a string of bytes allocated in memory

protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed
between processes running the same Python version.

Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure
compatibility with older versions of Python.
"""
Expand Down Expand Up @@ -1034,12 +1069,10 @@ def _get_cell_contents(cell):

def instance(cls):
"""Create a new instance of a class.

Parameters
----------
cls : type
The class to create an instance of.

Returns
-------
instance : cls
Expand All @@ -1059,7 +1092,6 @@ def __reduce__(cls):

def _fill_function(*args):
"""Fills in the rest of function data into the skeleton function object

The skeleton itself is create by _make_skel_func().
"""
if len(args) == 2:
Expand Down Expand Up @@ -1143,9 +1175,22 @@ def _make_skel_func(code, cell_count, base_globals=None):
return types.FunctionType(code, base_globals, None, None, closure)


def _make_skeleton_class(type_constructor, name, bases, type_kwargs,
class_tracker_id, extra):
"""Build dynamic class with an empty __dict__ to be filled once memoized
If class_tracker_id is not None, try to lookup an existing class definition
matching that id. If none is found, track a newly reconstructed class
definition under that id so that other instances stemming from the same
class id will also reuse this class definition.
The extra variable is meant to be a dict use for forward compatibility
shall the need arise.
"""
skeleton_class = type_constructor(name, bases, type_kwargs)
return _lookup_class_or_track(class_tracker_id, skeleton_class)


def _rehydrate_skeleton_class(skeleton_class, class_dict):
"""Put attributes from `class_dict` back on `skeleton_class`.

See CloudPickler.save_dynamic_class for more info.
"""
registry = None
Expand All @@ -1161,6 +1206,14 @@ def _rehydrate_skeleton_class(skeleton_class, class_dict):
return skeleton_class


def _make_dynamic_enum(base, name, elements, doc, module, class_tracker_id,
extra):
class_def = base(name, elements, module=module, **extra)
if doc is not None:
class_def.__doc__ = doc
return _lookup_class_or_track(class_tracker_id, class_def)


def _is_dynamic(module):
"""
Return True if the module is special module that cannot be imported by its
Expand Down