diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 7df5f6c748ad..09d3a5e7cfb6 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -44,7 +44,6 @@ import dis from functools import partial -import importlib import io import itertools import logging @@ -56,12 +55,26 @@ import traceback import types import weakref +import uuid +import threading + + +try: + from enum import Enum +except ImportError: + Enum = None # cloudpickle is meant for inter process communication: we expect all # communicating processes to run the same Python version hence we favor # communication speed over compatibility: DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL +# Track the provenance of reconstructed dynamic classes to make it possible to +# recontruct instances from the matching singleton class definition when +# appropriate and preserve the usual "isinstance" semantics of Python objects. +_DYNAMIC_CLASS_TRACKER_BY_CLASS = weakref.WeakKeyDictionary() +_DYNAMIC_CLASS_TRACKER_BY_ID = weakref.WeakValueDictionary() +_DYNAMIC_CLASS_TRACKER_LOCK = threading.Lock() if sys.version_info[0] < 3: # pragma: no branch from pickle import Pickler @@ -71,12 +84,37 @@ from StringIO import StringIO string_types = (basestring,) # noqa PY3 = False + PY2 = True + PY2_WRAPPER_DESCRIPTOR_TYPE = type(object.__init__) + PY2_METHOD_WRAPPER_TYPE = type(object.__eq__) + PY2_CLASS_DICT_BLACKLIST = (PY2_METHOD_WRAPPER_TYPE, + PY2_WRAPPER_DESCRIPTOR_TYPE) else: types.ClassType = type from pickle import _Pickler as Pickler from io import BytesIO as StringIO string_types = (str,) PY3 = True + PY2 = False + + +def _ensure_tracking(class_def): + with _DYNAMIC_CLASS_TRACKER_LOCK: + class_tracker_id = _DYNAMIC_CLASS_TRACKER_BY_CLASS.get(class_def) + if class_tracker_id is None: + class_tracker_id = uuid.uuid4().hex + _DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id + _DYNAMIC_CLASS_TRACKER_BY_ID[class_tracker_id] = class_def + return class_tracker_id + + +def _lookup_class_or_track(class_tracker_id, class_def): + if class_tracker_id is not None: + with _DYNAMIC_CLASS_TRACKER_LOCK: + class_def = _DYNAMIC_CLASS_TRACKER_BY_ID.setdefault( + class_tracker_id, class_def) + _DYNAMIC_CLASS_TRACKER_BY_CLASS[class_def] = class_tracker_id + return class_def def _make_cell_set_template_code(): @@ -112,7 +150,7 @@ def inner(value): # NOTE: we are marking the cell variable as a free variable intentionally # so that we simulate an inner function instead of the outer function. This # is what gives us the ``nonlocal`` behavior in a Python 2 compatible way. - if not PY3: # pragma: no branch + if PY2: # pragma: no branch return types.CodeType( co.co_argcount, co.co_nlocals, @@ -130,24 +168,43 @@ def inner(value): (), ) else: - return types.CodeType( - co.co_argcount, - co.co_kwonlyargcount, - co.co_nlocals, - co.co_stacksize, - co.co_flags, - co.co_code, - co.co_consts, - co.co_names, - co.co_varnames, - co.co_filename, - co.co_name, - co.co_firstlineno, - co.co_lnotab, - co.co_cellvars, # this is the trickery - (), - ) - + if hasattr(types.CodeType, "co_posonlyargcount"): # pragma: no branch + return types.CodeType( + co.co_argcount, + co.co_posonlyargcount, # Python3.8 with PEP570 + co.co_kwonlyargcount, + co.co_nlocals, + co.co_stacksize, + co.co_flags, + co.co_code, + co.co_consts, + co.co_names, + co.co_varnames, + co.co_filename, + co.co_name, + co.co_firstlineno, + co.co_lnotab, + co.co_cellvars, # this is the trickery + (), + ) + else: + return types.CodeType( + co.co_argcount, + co.co_kwonlyargcount, + co.co_nlocals, + co.co_stacksize, + co.co_flags, + co.co_code, + co.co_consts, + co.co_names, + co.co_varnames, + co.co_filename, + co.co_name, + co.co_firstlineno, + co.co_lnotab, + co.co_cellvars, # this is the trickery + (), + ) _cell_set_template_code = _make_cell_set_template_code() @@ -220,7 +277,7 @@ def _walk_global_ops(code): global-referencing instructions in *code*. """ code = getattr(code, 'co_code', b'') - if not PY3: # pragma: no branch + if PY2: # pragma: no branch code = map(ord, code) n = len(code) @@ -250,6 +307,39 @@ def _walk_global_ops(code): yield op, instr.arg +def _extract_class_dict(cls): + """Retrieve a copy of the dict of a class without the inherited methods""" + clsdict = dict(cls.__dict__) # copy dict proxy to a dict + if len(cls.__bases__) == 1: + inherited_dict = cls.__bases__[0].__dict__ + else: + inherited_dict = {} + for base in reversed(cls.__bases__): + inherited_dict.update(base.__dict__) + to_remove = [] + for name, value in clsdict.items(): + try: + base_value = inherited_dict[name] + if value is base_value: + to_remove.append(name) + elif PY2: + # backward compat for Python 2 + if hasattr(value, "im_func"): + if value.im_func is getattr(base_value, "im_func", None): + to_remove.append(name) + elif isinstance(value, PY2_CLASS_DICT_BLACKLIST): + # On Python 2 we have no way to pickle those specific + # methods types nor to check that they are actually + # inherited. So we assume that they are always inherited + # from builtin types. + to_remove.append(name) + except KeyError: + pass + for name in to_remove: + clsdict.pop(name) + return clsdict + + class CloudPickler(Pickler): dispatch = Pickler.dispatch.copy() @@ -277,7 +367,7 @@ def save_memoryview(self, obj): dispatch[memoryview] = save_memoryview - if not PY3: # pragma: no branch + if PY2: # pragma: no branch def save_buffer(self, obj): self.save(str(obj)) @@ -300,12 +390,23 @@ def save_codeobject(self, obj): Save a code object """ if PY3: # pragma: no branch - args = ( - obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, - obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, obj.co_varnames, - obj.co_filename, obj.co_name, obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, - obj.co_cellvars - ) + if hasattr(obj, "co_posonlyargcount"): # pragma: no branch + args = ( + obj.co_argcount, obj.co_posonlyargcount, + obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, + obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, + obj.co_varnames, obj.co_filename, obj.co_name, + obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, + obj.co_cellvars + ) + else: + args = ( + obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, + obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts, + obj.co_names, obj.co_varnames, obj.co_filename, + obj.co_name, obj.co_firstlineno, obj.co_lnotab, + obj.co_freevars, obj.co_cellvars + ) else: args = ( obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code, @@ -460,15 +561,40 @@ def func(): # then discards the reference to it self.write(pickle.POP) - def save_dynamic_class(self, obj): + def _save_dynamic_enum(self, obj, clsdict): + """Special handling for dynamic Enum subclasses + + Use a dedicated Enum constructor (inspired by EnumMeta.__call__) as the + EnumMeta metaclass has complex initialization that makes the Enum + subclasses hold references to their own instances. """ - Save a class that can't be stored as module global. + members = dict((e.name, e.value) for e in obj) + + # Python 2.7 with enum34 can have no qualname: + qualname = getattr(obj, "__qualname__", None) + + self.save_reduce(_make_skeleton_enum, + (obj.__bases__, obj.__name__, qualname, members, + obj.__module__, _ensure_tracking(obj), None), + obj=obj) + + # Cleanup the clsdict that will be passed to _rehydrate_skeleton_class: + # Those attributes are already handled by the metaclass. + for attrname in ["_generate_next_value_", "_member_names_", + "_member_map_", "_member_type_", + "_value2member_map_"]: + clsdict.pop(attrname, None) + for member in members: + clsdict.pop(member) + + def save_dynamic_class(self, obj): + """Save a class that can't be stored as module global. This method is used to serialize classes that are defined inside functions, or that otherwise can't be serialized as attribute lookups from global modules. """ - clsdict = dict(obj.__dict__) # copy dict proxy to a dict + clsdict = _extract_class_dict(obj) clsdict.pop('__weakref__', None) # For ABCMeta in python3.7+, remove _abc_impl as it is not picklable. @@ -496,8 +622,8 @@ def save_dynamic_class(self, obj): for k in obj.__slots__: clsdict.pop(k, None) - # If type overrides __dict__ as a property, include it in the type kwargs. - # In Python 2, we can't set this attribute after construction. + # If type overrides __dict__ as a property, include it in the type + # kwargs. In Python 2, we can't set this attribute after construction. __dict__ = clsdict.pop('__dict__', None) if isinstance(__dict__, property): type_kwargs['__dict__'] = __dict__ @@ -524,8 +650,16 @@ def save_dynamic_class(self, obj): write(pickle.MARK) # Create and memoize an skeleton class with obj's name and bases. - tp = type(obj) - self.save_reduce(tp, (obj.__name__, obj.__bases__, type_kwargs), obj=obj) + if Enum is not None and issubclass(obj, Enum): + # Special handling of Enum subclasses + self._save_dynamic_enum(obj, clsdict) + else: + # "Regular" class definition: + tp = type(obj) + self.save_reduce(_make_skeleton_class, + (tp, obj.__name__, obj.__bases__, type_kwargs, + _ensure_tracking(obj), None), + obj=obj) # Now save the rest of obj's __dict__. Any references to obj # encountered while saving will point to the skeleton class. @@ -778,7 +912,7 @@ def save_inst(self, obj): save(stuff) write(pickle.BUILD) - if not PY3: # pragma: no branch + if PY2: # pragma: no branch dispatch[types.InstanceType] = save_inst def save_property(self, obj): @@ -1119,6 +1253,22 @@ def _make_skel_func(code, cell_count, base_globals=None): return types.FunctionType(code, base_globals, None, None, closure) +def _make_skeleton_class(type_constructor, name, bases, type_kwargs, + class_tracker_id, extra): + """Build dynamic class with an empty __dict__ to be filled once memoized + + If class_tracker_id is not None, try to lookup an existing class definition + matching that id. If none is found, track a newly reconstructed class + definition under that id so that other instances stemming from the same + class id will also reuse this class definition. + + The "extra" variable is meant to be a dict (or None) that can be used for + forward compatibility shall the need arise. + """ + skeleton_class = type_constructor(name, bases, type_kwargs) + return _lookup_class_or_track(class_tracker_id, skeleton_class) + + def _rehydrate_skeleton_class(skeleton_class, class_dict): """Put attributes from `class_dict` back on `skeleton_class`. @@ -1137,6 +1287,39 @@ def _rehydrate_skeleton_class(skeleton_class, class_dict): return skeleton_class +def _make_skeleton_enum(bases, name, qualname, members, module, + class_tracker_id, extra): + """Build dynamic enum with an empty __dict__ to be filled once memoized + + The creation of the enum class is inspired by the code of + EnumMeta._create_. + + If class_tracker_id is not None, try to lookup an existing enum definition + matching that id. If none is found, track a newly reconstructed enum + definition under that id so that other instances stemming from the same + class id will also reuse this enum definition. + + The "extra" variable is meant to be a dict (or None) that can be used for + forward compatibility shall the need arise. + """ + # enums always inherit from their base Enum class at the last position in + # the list of base classes: + enum_base = bases[-1] + metacls = enum_base.__class__ + classdict = metacls.__prepare__(name, bases) + + for member_name, member_value in members.items(): + classdict[member_name] = member_value + enum_class = metacls.__new__(metacls, name, bases, classdict) + enum_class.__module__ = module + + # Python 2.7 compat + if qualname is not None: + enum_class.__qualname__ = qualname + + return _lookup_class_or_track(class_tracker_id, enum_class) + + def _is_dynamic(module): """ Return True if the module is special module that cannot be imported by its @@ -1176,4 +1359,4 @@ def _reduce_method_descriptor(obj): import copy_reg as copyreg except ImportError: import copyreg - copyreg.pickle(method_descriptor, _reduce_method_descriptor) + copyreg.pickle(method_descriptor, _reduce_method_descriptor) \ No newline at end of file diff --git a/python/setup.py b/python/setup.py index ee5c32683efa..ea672309703b 100644 --- a/python/setup.py +++ b/python/setup.py @@ -230,6 +230,7 @@ def _supports_symlinks(): 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: Implementation :: CPython', 'Programming Language :: Python :: Implementation :: PyPy'] )