Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 220 additions & 37 deletions python/pyspark/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

import dis
from functools import partial
import importlib
import io
import itertools
import logging
Expand All @@ -56,12 +55,26 @@
import traceback
import types
import weakref
import uuid
import threading


try:
from enum import Enum
except ImportError:
Enum = None

# cloudpickle is meant for inter process communication: we expect all
# communicating processes to run the same Python version hence we favor
# communication speed over compatibility:
DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL

# Track the provenance of reconstructed dynamic classes to make it possible to
# recontruct instances from the matching singleton class definition when
# appropriate and preserve the usual "isinstance" semantics of Python objects.
_DYNAMIC_CLASS_TRACKER_BY_CLASS = weakref.WeakKeyDictionary()
_DYNAMIC_CLASS_TRACKER_BY_ID = weakref.WeakValueDictionary()
_DYNAMIC_CLASS_TRACKER_LOCK = threading.Lock()

if sys.version_info[0] < 3: # pragma: no branch
from pickle import Pickler
Expand All @@ -71,12 +84,37 @@
from StringIO import StringIO
string_types = (basestring,) # noqa
PY3 = False
PY2 = True
PY2_WRAPPER_DESCRIPTOR_TYPE = type(object.__init__)
PY2_METHOD_WRAPPER_TYPE = type(object.__eq__)
PY2_CLASS_DICT_BLACKLIST = (PY2_METHOD_WRAPPER_TYPE,
PY2_WRAPPER_DESCRIPTOR_TYPE)
else:
types.ClassType = type
from pickle import _Pickler as Pickler
from io import BytesIO as StringIO
string_types = (str,)
PY3 = True
PY2 = False


def _ensure_tracking(class_def):
with _DYNAMIC_CLASS_TRACKER_LOCK:
class_tracker_id = _DYNAMIC_CLASS_TRACKER_BY_CLASS.get(class_def)
if class_tracker_id is None:
class_tracker_id = uuid.uuid4().hex
_DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
_DYNAMIC_CLASS_TRACKER_BY_ID[class_tracker_id] = class_def
return class_tracker_id


def _lookup_class_or_track(class_tracker_id, class_def):
if class_tracker_id is not None:
with _DYNAMIC_CLASS_TRACKER_LOCK:
class_def = _DYNAMIC_CLASS_TRACKER_BY_ID.setdefault(
class_tracker_id, class_def)
_DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id
return class_def


def _make_cell_set_template_code():
Expand Down Expand Up @@ -112,7 +150,7 @@ def inner(value):
# NOTE: we are marking the cell variable as a free variable intentionally
# so that we simulate an inner function instead of the outer function. This
# is what gives us the ``nonlocal`` behavior in a Python 2 compatible way.
if not PY3: # pragma: no branch
if PY2: # pragma: no branch
return types.CodeType(
co.co_argcount,
co.co_nlocals,
Expand All @@ -130,24 +168,43 @@ def inner(value):
(),
)
else:
return types.CodeType(
co.co_argcount,
co.co_kwonlyargcount,
co.co_nlocals,
co.co_stacksize,
co.co_flags,
co.co_code,
co.co_consts,
co.co_names,
co.co_varnames,
co.co_filename,
co.co_name,
co.co_firstlineno,
co.co_lnotab,
co.co_cellvars, # this is the trickery
(),
)

if hasattr(types.CodeType, "co_posonlyargcount"): # pragma: no branch
return types.CodeType(
co.co_argcount,
co.co_posonlyargcount, # Python3.8 with PEP570
co.co_kwonlyargcount,
co.co_nlocals,
co.co_stacksize,
co.co_flags,
co.co_code,
co.co_consts,
co.co_names,
co.co_varnames,
co.co_filename,
co.co_name,
co.co_firstlineno,
co.co_lnotab,
co.co_cellvars, # this is the trickery
(),
)
else:
return types.CodeType(
co.co_argcount,
co.co_kwonlyargcount,
co.co_nlocals,
co.co_stacksize,
co.co_flags,
co.co_code,
co.co_consts,
co.co_names,
co.co_varnames,
co.co_filename,
co.co_name,
co.co_firstlineno,
co.co_lnotab,
co.co_cellvars, # this is the trickery
(),
)

_cell_set_template_code = _make_cell_set_template_code()

Expand Down Expand Up @@ -220,7 +277,7 @@ def _walk_global_ops(code):
global-referencing instructions in *code*.
"""
code = getattr(code, 'co_code', b'')
if not PY3: # pragma: no branch
if PY2: # pragma: no branch
code = map(ord, code)

n = len(code)
Expand Down Expand Up @@ -250,6 +307,39 @@ def _walk_global_ops(code):
yield op, instr.arg


def _extract_class_dict(cls):
"""Retrieve a copy of the dict of a class without the inherited methods"""
clsdict = dict(cls.__dict__) # copy dict proxy to a dict
if len(cls.__bases__) == 1:
inherited_dict = cls.__bases__[0].__dict__
else:
inherited_dict = {}
for base in reversed(cls.__bases__):
inherited_dict.update(base.__dict__)
to_remove = []
for name, value in clsdict.items():
try:
base_value = inherited_dict[name]
if value is base_value:
to_remove.append(name)
elif PY2:
# backward compat for Python 2
if hasattr(value, "im_func"):
if value.im_func is getattr(base_value, "im_func", None):
to_remove.append(name)
elif isinstance(value, PY2_CLASS_DICT_BLACKLIST):
# On Python 2 we have no way to pickle those specific
# methods types nor to check that they are actually
# inherited. So we assume that they are always inherited
# from builtin types.
to_remove.append(name)
except KeyError:
pass
for name in to_remove:
clsdict.pop(name)
return clsdict


class CloudPickler(Pickler):

dispatch = Pickler.dispatch.copy()
Expand Down Expand Up @@ -277,7 +367,7 @@ def save_memoryview(self, obj):

dispatch[memoryview] = save_memoryview

if not PY3: # pragma: no branch
if PY2: # pragma: no branch
def save_buffer(self, obj):
self.save(str(obj))

Expand All @@ -300,12 +390,23 @@ def save_codeobject(self, obj):
Save a code object
"""
if PY3: # pragma: no branch
args = (
obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, obj.co_varnames,
obj.co_filename, obj.co_name, obj.co_firstlineno, obj.co_lnotab, obj.co_freevars,
obj.co_cellvars
)
if hasattr(obj, "co_posonlyargcount"): # pragma: no branch
args = (
obj.co_argcount, obj.co_posonlyargcount,
obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
obj.co_flags, obj.co_code, obj.co_consts, obj.co_names,
obj.co_varnames, obj.co_filename, obj.co_name,
obj.co_firstlineno, obj.co_lnotab, obj.co_freevars,
obj.co_cellvars
)
else:
args = (
obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals,
obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts,
obj.co_names, obj.co_varnames, obj.co_filename,
obj.co_name, obj.co_firstlineno, obj.co_lnotab,
obj.co_freevars, obj.co_cellvars
)
else:
args = (
obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code,
Expand Down Expand Up @@ -460,15 +561,40 @@ def func():
# then discards the reference to it
self.write(pickle.POP)

def save_dynamic_class(self, obj):
def _save_dynamic_enum(self, obj, clsdict):
"""Special handling for dynamic Enum subclasses

Use a dedicated Enum constructor (inspired by EnumMeta.__call__) as the
EnumMeta metaclass has complex initialization that makes the Enum
subclasses hold references to their own instances.
"""
Save a class that can't be stored as module global.
members = dict((e.name, e.value) for e in obj)

# Python 2.7 with enum34 can have no qualname:
qualname = getattr(obj, "__qualname__", None)

self.save_reduce(_make_skeleton_enum,
(obj.__bases__, obj.__name__, qualname, members,
obj.__module__, _ensure_tracking(obj), None),
obj=obj)

# Cleanup the clsdict that will be passed to _rehydrate_skeleton_class:
# Those attributes are already handled by the metaclass.
for attrname in ["_generate_next_value_", "_member_names_",
"_member_map_", "_member_type_",
"_value2member_map_"]:
clsdict.pop(attrname, None)
for member in members:
clsdict.pop(member)

def save_dynamic_class(self, obj):
"""Save a class that can't be stored as module global.

This method is used to serialize classes that are defined inside
functions, or that otherwise can't be serialized as attribute lookups
from global modules.
"""
clsdict = dict(obj.__dict__) # copy dict proxy to a dict
clsdict = _extract_class_dict(obj)
clsdict.pop('__weakref__', None)

# For ABCMeta in python3.7+, remove _abc_impl as it is not picklable.
Expand Down Expand Up @@ -496,8 +622,8 @@ def save_dynamic_class(self, obj):
for k in obj.__slots__:
clsdict.pop(k, None)

# If type overrides __dict__ as a property, include it in the type kwargs.
# In Python 2, we can't set this attribute after construction.
# If type overrides __dict__ as a property, include it in the type
# kwargs. In Python 2, we can't set this attribute after construction.
__dict__ = clsdict.pop('__dict__', None)
if isinstance(__dict__, property):
type_kwargs['__dict__'] = __dict__
Expand All @@ -524,8 +650,16 @@ def save_dynamic_class(self, obj):
write(pickle.MARK)

# Create and memoize an skeleton class with obj's name and bases.
tp = type(obj)
self.save_reduce(tp, (obj.__name__, obj.__bases__, type_kwargs), obj=obj)
if Enum is not None and issubclass(obj, Enum):
# Special handling of Enum subclasses
self._save_dynamic_enum(obj, clsdict)
else:
# "Regular" class definition:
tp = type(obj)
self.save_reduce(_make_skeleton_class,
(tp, obj.__name__, obj.__bases__, type_kwargs,
_ensure_tracking(obj), None),
obj=obj)

# Now save the rest of obj's __dict__. Any references to obj
# encountered while saving will point to the skeleton class.
Expand Down Expand Up @@ -778,7 +912,7 @@ def save_inst(self, obj):
save(stuff)
write(pickle.BUILD)

if not PY3: # pragma: no branch
if PY2: # pragma: no branch
dispatch[types.InstanceType] = save_inst

def save_property(self, obj):
Expand Down Expand Up @@ -1119,6 +1253,22 @@ def _make_skel_func(code, cell_count, base_globals=None):
return types.FunctionType(code, base_globals, None, None, closure)


def _make_skeleton_class(type_constructor, name, bases, type_kwargs,
class_tracker_id, extra):
"""Build dynamic class with an empty __dict__ to be filled once memoized

If class_tracker_id is not None, try to lookup an existing class definition
matching that id. If none is found, track a newly reconstructed class
definition under that id so that other instances stemming from the same
class id will also reuse this class definition.

The "extra" variable is meant to be a dict (or None) that can be used for
forward compatibility shall the need arise.
"""
skeleton_class = type_constructor(name, bases, type_kwargs)
return _lookup_class_or_track(class_tracker_id, skeleton_class)


def _rehydrate_skeleton_class(skeleton_class, class_dict):
"""Put attributes from `class_dict` back on `skeleton_class`.

Expand All @@ -1137,6 +1287,39 @@ def _rehydrate_skeleton_class(skeleton_class, class_dict):
return skeleton_class


def _make_skeleton_enum(bases, name, qualname, members, module,
class_tracker_id, extra):
"""Build dynamic enum with an empty __dict__ to be filled once memoized

The creation of the enum class is inspired by the code of
EnumMeta._create_.

If class_tracker_id is not None, try to lookup an existing enum definition
matching that id. If none is found, track a newly reconstructed enum
definition under that id so that other instances stemming from the same
class id will also reuse this enum definition.

The "extra" variable is meant to be a dict (or None) that can be used for
forward compatibility shall the need arise.
"""
# enums always inherit from their base Enum class at the last position in
# the list of base classes:
enum_base = bases[-1]
metacls = enum_base.__class__
classdict = metacls.__prepare__(name, bases)

for member_name, member_value in members.items():
classdict[member_name] = member_value
enum_class = metacls.__new__(metacls, name, bases, classdict)
enum_class.__module__ = module

# Python 2.7 compat
if qualname is not None:
enum_class.__qualname__ = qualname

return _lookup_class_or_track(class_tracker_id, enum_class)


def _is_dynamic(module):
"""
Return True if the module is special module that cannot be imported by its
Expand Down Expand Up @@ -1176,4 +1359,4 @@ def _reduce_method_descriptor(obj):
import copy_reg as copyreg
except ImportError:
import copyreg
copyreg.pickle(method_descriptor, _reduce_method_descriptor)
copyreg.pickle(method_descriptor, _reduce_method_descriptor)
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def _supports_symlinks():
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy']
)
Expand Down