Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions python/pyspark/cloudpickle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from __future__ import absolute_import


from pyspark.cloudpickle.cloudpickle import * # noqa
from pyspark.cloudpickle.cloudpickle_fast import CloudPickler, dumps, dump # noqa

# Conform to the convention used by python serialization libraries, which
# expose their Pickler subclass at top-level under the "Pickler" name.
Pickler = CloudPickler

__version__ = '2.0.0'
__version__ = '2.2.0'
77 changes: 26 additions & 51 deletions python/pyspark/cloudpickle/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
from __future__ import print_function

import builtins
import dis
Expand All @@ -56,7 +55,7 @@

from .compat import pickle
from collections import OrderedDict
from typing import Generic, Union, Tuple, Callable
from typing import ClassVar, Generic, Union, Tuple, Callable
from pickle import _getattribute
from importlib._bootstrap import _find_spec

Expand All @@ -66,11 +65,6 @@
except ImportError:
_typing_extensions = Literal = Final = None

if sys.version_info >= (3, 5, 3):
from typing import ClassVar
else: # pragma: no cover
ClassVar = None

if sys.version_info >= (3, 8):
from types import CellType
else:
Expand Down Expand Up @@ -327,11 +321,10 @@ def _extract_code_globals(co):
"""
out_names = _extract_code_globals_cache.get(co)
if out_names is None:
names = co.co_names
# We use a dict with None values instead of a set to get a
# deterministic order (assuming Python 3.6+) and avoid introducing
# non-deterministic pickle bytes as a results.
out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
out_names = {name: None for name in _walk_global_ops(co)}

# Declaring a function inside another one using the "def ..."
# syntax generates a constant code object corresponding to the one
Expand Down Expand Up @@ -517,13 +510,12 @@ def _builtin_type(name):

def _walk_global_ops(code):
"""
Yield (opcode, argument number) tuples for all
global-referencing instructions in *code*.
Yield referenced name for all global-referencing instructions in *code*.
"""
for instr in dis.get_instructions(code):
op = instr.opcode
if op in GLOBAL_OPS:
yield op, instr.arg
yield instr.argval


def _extract_class_dict(cls):
Expand Down Expand Up @@ -604,43 +596,21 @@ def parametrized_type_hint_getinitargs(obj):
elif type(obj) is type(ClassVar):
initargs = (ClassVar, obj.__type__)
elif type(obj) is type(Generic):
parameters = obj.__parameters__
if len(obj.__parameters__) > 0:
# in early Python 3.5, __parameters__ was sometimes
# preferred to __args__
initargs = (obj.__origin__, parameters)

else:
initargs = (obj.__origin__, obj.__args__)
initargs = (obj.__origin__, obj.__args__)
elif type(obj) is type(Union):
if sys.version_info < (3, 5, 3): # pragma: no cover
initargs = (Union, obj.__union_params__)
else:
initargs = (Union, obj.__args__)
initargs = (Union, obj.__args__)
elif type(obj) is type(Tuple):
if sys.version_info < (3, 5, 3): # pragma: no cover
initargs = (Tuple, obj.__tuple_params__)
else:
initargs = (Tuple, obj.__args__)
initargs = (Tuple, obj.__args__)
elif type(obj) is type(Callable):
if sys.version_info < (3, 5, 3): # pragma: no cover
args = obj.__args__
result = obj.__result__
if args != Ellipsis:
if isinstance(args, tuple):
args = list(args)
else:
args = [args]
(*args, result) = obj.__args__
if len(args) == 1 and args[0] is Ellipsis:
args = Ellipsis
else:
(*args, result) = obj.__args__
if len(args) == 1 and args[0] is Ellipsis:
args = Ellipsis
else:
args = list(args)
args = list(args)
initargs = (Callable, (args, result))
else: # pragma: no cover
raise pickle.PicklingError(
"Cloudpickle Error: Unknown type {}".format(type(obj))
f"Cloudpickle Error: Unknown type {type(obj)}"
)
return initargs

Expand Down Expand Up @@ -720,7 +690,7 @@ def instance(cls):


@instance
class _empty_cell_value(object):
class _empty_cell_value:
"""sentinel for empty closures
"""
@classmethod
Expand Down Expand Up @@ -749,7 +719,7 @@ def _fill_function(*args):
keys = ['globals', 'defaults', 'dict', 'module', 'closure_values']
state = dict(zip(keys, args[1:]))
else:
raise ValueError('Unexpected _fill_value arguments: %r' % (args,))
raise ValueError(f'Unexpected _fill_value arguments: {args!r}')

# - At pickling time, any dynamic global variable used by func is
# serialized by value (in state['globals']).
Expand Down Expand Up @@ -793,6 +763,12 @@ def _fill_function(*args):
return func


def _make_function(code, globals, name, argdefs, closure):
# Setting __builtins__ in globals is needed for nogil CPython.
globals["__builtins__"] = __builtins__
return types.FunctionType(code, globals, name, argdefs, closure)


def _make_empty_cell():
if False:
# trick the compiler into creating an empty cell in our lambda
Expand Down Expand Up @@ -917,15 +893,10 @@ def _make_typevar(name, bound, constraints, covariant, contravariant,


def _decompose_typevar(obj):
try:
class_tracker_id = _get_or_create_tracker_id(obj)
except TypeError: # pragma: nocover
# TypeVar instances are not weakref-able in Python 3.5.3
class_tracker_id = None
return (
obj.__name__, obj.__bound__, obj.__constraints__,
obj.__covariant__, obj.__contravariant__,
class_tracker_id,
_get_or_create_tracker_id(obj),
)


Expand All @@ -943,8 +914,12 @@ def _typevar_reduce(obj):


def _get_bases(typ):
if hasattr(typ, '__orig_bases__'):
if '__orig_bases__' in getattr(typ, '__dict__', {}):
# For generic types (see PEP 560)
# Note that simply checking `hasattr(typ, '__orig_bases__')` is not
# correct. Subclasses of a fully-parameterized generic class does not
# have `__orig_bases__` defined, but `hasattr(typ, '__orig_bases__')`
# will return True because it's defined in the base class.
bases_attr = '__orig_bases__'
else:
# For regular class objects
Expand Down
91 changes: 63 additions & 28 deletions python/pyspark/cloudpickle/cloudpickle_fast.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
_is_parametrized_type_hint, PYPY, cell_set,
parametrized_type_hint_getinitargs, _create_parametrized_type_hint,
builtin_code_type,
_make_dict_keys, _make_dict_values, _make_dict_items,
_make_dict_keys, _make_dict_values, _make_dict_items, _make_function,
)


if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY:
if pickle.HIGHEST_PROTOCOL >= 5:
# Shorthands similar to pickle.dump/pickle.dumps

def dump(obj, file, protocol=None, buffer_callback=None):
Expand Down Expand Up @@ -123,7 +123,7 @@ def _class_getnewargs(obj):


def _enum_getnewargs(obj):
members = dict((e.name, e.value) for e in obj)
members = {e.name: e.value for e in obj}
return (obj.__bases__, obj.__name__, obj.__qualname__, members,
obj.__module__, _get_or_create_tracker_id(obj), None)

Expand Down Expand Up @@ -218,7 +218,7 @@ def _class_getstate(obj):
def _enum_getstate(obj):
clsdict, slotstate = _class_getstate(obj)

members = dict((e.name, e.value) for e in obj)
members = {e.name: e.value for e in obj}
# Cleanup the clsdict that will be passed to _rehydrate_skeleton_class:
# Those attributes are already handled by the metaclass.
for attrname in ["_generate_next_value_", "_member_names_",
Expand All @@ -244,7 +244,22 @@ def _enum_getstate(obj):

def _code_reduce(obj):
"""codeobject reducer"""
if hasattr(obj, "co_linetable"): # pragma: no branch
# If you are not sure about the order of arguments, take a look at help
# of the specific type from types, for example:
# >>> from types import CodeType
# >>> help(CodeType)
if hasattr(obj, "co_exceptiontable"): # pragma: no branch
# Python 3.11 and later: there are some new attributes
# related to the enhanced exceptions.
args = (
obj.co_argcount, obj.co_posonlyargcount,
obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
obj.co_flags, obj.co_code, obj.co_consts, obj.co_names,
obj.co_varnames, obj.co_filename, obj.co_name, obj.co_qualname,
obj.co_firstlineno, obj.co_linetable, obj.co_exceptiontable,
obj.co_freevars, obj.co_cellvars,
)
elif hasattr(obj, "co_linetable"): # pragma: no branch
# Python 3.10 and later: obj.co_lnotab is deprecated and constructor
# expects obj.co_linetable instead.
args = (
Expand All @@ -255,6 +270,18 @@ def _code_reduce(obj):
obj.co_firstlineno, obj.co_linetable, obj.co_freevars,
obj.co_cellvars
)
elif hasattr(obj, "co_nmeta"): # pragma: no cover
# "nogil" Python: modified attributes from 3.9
args = (
obj.co_argcount, obj.co_posonlyargcount,
obj.co_kwonlyargcount, obj.co_nlocals, obj.co_framesize,
obj.co_ndefaultargs, obj.co_nmeta,
obj.co_flags, obj.co_code, obj.co_consts,
obj.co_varnames, obj.co_filename, obj.co_name,
obj.co_firstlineno, obj.co_lnotab, obj.co_exc_handlers,
obj.co_jump_table, obj.co_freevars, obj.co_cellvars,
obj.co_free2reg, obj.co_cell2reg
)
elif hasattr(obj, "co_posonlyargcount"):
# Backward compat for 3.9 and older
args = (
Expand Down Expand Up @@ -534,7 +561,10 @@ class CloudPickler(Pickler):
_dispatch_table[type(OrderedDict().keys())] = _odict_keys_reduce
_dispatch_table[type(OrderedDict().values())] = _odict_values_reduce
_dispatch_table[type(OrderedDict().items())] = _odict_items_reduce

_dispatch_table[abc.abstractmethod] = _classmethod_reduce
_dispatch_table[abc.abstractclassmethod] = _classmethod_reduce
_dispatch_table[abc.abstractstaticmethod] = _classmethod_reduce
_dispatch_table[abc.abstractproperty] = _property_reduce

dispatch_table = ChainMap(_dispatch_table, copyreg.dispatch_table)

Expand All @@ -544,7 +574,7 @@ def _dynamic_function_reduce(self, func):
"""Reduce a function that is not pickleable via attribute lookup."""
newargs = self._function_getnewargs(func)
state = _function_getstate(func)
return (types.FunctionType, newargs, state, None, None,
return (_make_function, newargs, state, None, None,
_function_setstate)

def _function_reduce(self, obj):
Expand Down Expand Up @@ -611,6 +641,32 @@ def dump(self, obj):
raise

if pickle.HIGHEST_PROTOCOL >= 5:
def __init__(self, file, protocol=None, buffer_callback=None):
if protocol is None:
protocol = DEFAULT_PROTOCOL
Pickler.__init__(
self, file, protocol=protocol, buffer_callback=buffer_callback
)
# map functions __globals__ attribute ids, to ensure that functions
# sharing the same global namespace at pickling time also share
# their global namespace at unpickling time.
self.globals_ref = {}
self.proto = int(protocol)
else:
def __init__(self, file, protocol=None):
if protocol is None:
protocol = DEFAULT_PROTOCOL
Pickler.__init__(self, file, protocol=protocol)
# map functions __globals__ attribute ids, to ensure that functions
# sharing the same global namespace at pickling time also share
# their global namespace at unpickling time.
self.globals_ref = {}
assert hasattr(self, 'proto')

if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY:
# Pickler is the C implementation of the CPython pickler and therefore
# we rely on reduce_override method to customize the pickler behavior.

# `CloudPickler.dispatch` is only left for backward compatibility - note
# that when using protocol 5, `CloudPickler.dispatch` is not an
# extension of `Pickler.dispatch` dictionary, because CloudPickler
Expand All @@ -631,17 +687,6 @@ def dump(self, obj):
# availability of both notions coincide on CPython's pickle and the
# pickle5 backport, but it may not be the case anymore when pypy
# implements protocol 5
def __init__(self, file, protocol=None, buffer_callback=None):
if protocol is None:
protocol = DEFAULT_PROTOCOL
Pickler.__init__(
self, file, protocol=protocol, buffer_callback=buffer_callback
)
# map functions __globals__ attribute ids, to ensure that functions
# sharing the same global namespace at pickling time also share
# their global namespace at unpickling time.
self.globals_ref = {}
self.proto = int(protocol)

def reducer_override(self, obj):
"""Type-agnostic reducing callback for function and classes.
Expand Down Expand Up @@ -702,16 +747,6 @@ def reducer_override(self, obj):
# hard-coded call to save_global when pickling meta-classes.
dispatch = Pickler.dispatch.copy()

def __init__(self, file, protocol=None):
if protocol is None:
protocol = DEFAULT_PROTOCOL
Pickler.__init__(self, file, protocol=protocol)
# map functions __globals__ attribute ids, to ensure that functions
# sharing the same global namespace at pickling time also share
# their global namespace at unpickling time.
self.globals_ref = {}
assert hasattr(self, 'proto')

def _save_reduce_pickle5(self, func, args, state=None, listitems=None,
dictitems=None, state_setter=None, obj=None):
save = self.save
Expand Down