From 1a1fad85919553b583df9ce1aaf45ad214b61526 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 27 Oct 2022 23:55:32 -0700 Subject: [PATCH 1/2] [SPARK-XXX][PYTHON] Update cloudpickle to v2.2.0 --- python/pyspark/cloudpickle/cloudpickle.py | 77 ++++++++--------------- 1 file changed, 26 insertions(+), 51 deletions(-) diff --git a/python/pyspark/cloudpickle/cloudpickle.py b/python/pyspark/cloudpickle/cloudpickle.py index 347b386958037..317be69151ac3 100644 --- a/python/pyspark/cloudpickle/cloudpickle.py +++ b/python/pyspark/cloudpickle/cloudpickle.py @@ -40,7 +40,6 @@ NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ -from __future__ import print_function import builtins import dis @@ -56,7 +55,7 @@ from .compat import pickle from collections import OrderedDict -from typing import Generic, Union, Tuple, Callable +from typing import ClassVar, Generic, Union, Tuple, Callable from pickle import _getattribute from importlib._bootstrap import _find_spec @@ -66,11 +65,6 @@ except ImportError: _typing_extensions = Literal = Final = None -if sys.version_info >= (3, 5, 3): - from typing import ClassVar -else: # pragma: no cover - ClassVar = None - if sys.version_info >= (3, 8): from types import CellType else: @@ -327,11 +321,10 @@ def _extract_code_globals(co): """ out_names = _extract_code_globals_cache.get(co) if out_names is None: - names = co.co_names # We use a dict with None values instead of a set to get a # deterministic order (assuming Python 3.6+) and avoid introducing # non-deterministic pickle bytes as a results. - out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)} + out_names = {name: None for name in _walk_global_ops(co)} # Declaring a function inside another one using the "def ..." # syntax generates a constant code object corresponding to the one @@ -517,13 +510,12 @@ def _builtin_type(name): def _walk_global_ops(code): """ - Yield (opcode, argument number) tuples for all - global-referencing instructions in *code*. + Yield referenced name for all global-referencing instructions in *code*. """ for instr in dis.get_instructions(code): op = instr.opcode if op in GLOBAL_OPS: - yield op, instr.arg + yield instr.argval def _extract_class_dict(cls): @@ -604,43 +596,21 @@ def parametrized_type_hint_getinitargs(obj): elif type(obj) is type(ClassVar): initargs = (ClassVar, obj.__type__) elif type(obj) is type(Generic): - parameters = obj.__parameters__ - if len(obj.__parameters__) > 0: - # in early Python 3.5, __parameters__ was sometimes - # preferred to __args__ - initargs = (obj.__origin__, parameters) - - else: - initargs = (obj.__origin__, obj.__args__) + initargs = (obj.__origin__, obj.__args__) elif type(obj) is type(Union): - if sys.version_info < (3, 5, 3): # pragma: no cover - initargs = (Union, obj.__union_params__) - else: - initargs = (Union, obj.__args__) + initargs = (Union, obj.__args__) elif type(obj) is type(Tuple): - if sys.version_info < (3, 5, 3): # pragma: no cover - initargs = (Tuple, obj.__tuple_params__) - else: - initargs = (Tuple, obj.__args__) + initargs = (Tuple, obj.__args__) elif type(obj) is type(Callable): - if sys.version_info < (3, 5, 3): # pragma: no cover - args = obj.__args__ - result = obj.__result__ - if args != Ellipsis: - if isinstance(args, tuple): - args = list(args) - else: - args = [args] + (*args, result) = obj.__args__ + if len(args) == 1 and args[0] is Ellipsis: + args = Ellipsis else: - (*args, result) = obj.__args__ - if len(args) == 1 and args[0] is Ellipsis: - args = Ellipsis - else: - args = list(args) + args = list(args) initargs = (Callable, (args, result)) else: # pragma: no cover raise pickle.PicklingError( - "Cloudpickle Error: Unknown type {}".format(type(obj)) + f"Cloudpickle Error: Unknown type {type(obj)}" ) return initargs @@ -720,7 +690,7 @@ def instance(cls): @instance -class _empty_cell_value(object): +class _empty_cell_value: """sentinel for empty closures """ @classmethod @@ -749,7 +719,7 @@ def _fill_function(*args): keys = ['globals', 'defaults', 'dict', 'module', 'closure_values'] state = dict(zip(keys, args[1:])) else: - raise ValueError('Unexpected _fill_value arguments: %r' % (args,)) + raise ValueError(f'Unexpected _fill_value arguments: {args!r}') # - At pickling time, any dynamic global variable used by func is # serialized by value (in state['globals']). @@ -793,6 +763,12 @@ def _fill_function(*args): return func +def _make_function(code, globals, name, argdefs, closure): + # Setting __builtins__ in globals is needed for nogil CPython. + globals["__builtins__"] = __builtins__ + return types.FunctionType(code, globals, name, argdefs, closure) + + def _make_empty_cell(): if False: # trick the compiler into creating an empty cell in our lambda @@ -917,15 +893,10 @@ def _make_typevar(name, bound, constraints, covariant, contravariant, def _decompose_typevar(obj): - try: - class_tracker_id = _get_or_create_tracker_id(obj) - except TypeError: # pragma: nocover - # TypeVar instances are not weakref-able in Python 3.5.3 - class_tracker_id = None return ( obj.__name__, obj.__bound__, obj.__constraints__, obj.__covariant__, obj.__contravariant__, - class_tracker_id, + _get_or_create_tracker_id(obj), ) @@ -943,8 +914,12 @@ def _typevar_reduce(obj): def _get_bases(typ): - if hasattr(typ, '__orig_bases__'): + if '__orig_bases__' in getattr(typ, '__dict__', {}): # For generic types (see PEP 560) + # Note that simply checking `hasattr(typ, '__orig_bases__')` is not + # correct. Subclasses of a fully-parameterized generic class does not + # have `__orig_bases__` defined, but `hasattr(typ, '__orig_bases__')` + # will return True because it's defined in the base class. bases_attr = '__orig_bases__' else: # For regular class objects From 307abe43a3044fa58ba52cf85e96173822ee812e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 1 Nov 2022 20:54:28 -0700 Subject: [PATCH 2/2] Address comments --- python/pyspark/cloudpickle/__init__.py | 5 +- .../pyspark/cloudpickle/cloudpickle_fast.py | 91 +++++++++++++------ 2 files changed, 64 insertions(+), 32 deletions(-) diff --git a/python/pyspark/cloudpickle/__init__.py b/python/pyspark/cloudpickle/__init__.py index 0ae79b5535c85..efbf1178d431f 100644 --- a/python/pyspark/cloudpickle/__init__.py +++ b/python/pyspark/cloudpickle/__init__.py @@ -1,6 +1,3 @@ -from __future__ import absolute_import - - from pyspark.cloudpickle.cloudpickle import * # noqa from pyspark.cloudpickle.cloudpickle_fast import CloudPickler, dumps, dump # noqa @@ -8,4 +5,4 @@ # expose their Pickler subclass at top-level under the "Pickler" name. Pickler = CloudPickler -__version__ = '2.0.0' +__version__ = '2.2.0' diff --git a/python/pyspark/cloudpickle/cloudpickle_fast.py b/python/pyspark/cloudpickle/cloudpickle_fast.py index 6db059eb858bd..8741dcbdaaa41 100644 --- a/python/pyspark/cloudpickle/cloudpickle_fast.py +++ b/python/pyspark/cloudpickle/cloudpickle_fast.py @@ -35,11 +35,11 @@ _is_parametrized_type_hint, PYPY, cell_set, parametrized_type_hint_getinitargs, _create_parametrized_type_hint, builtin_code_type, - _make_dict_keys, _make_dict_values, _make_dict_items, + _make_dict_keys, _make_dict_values, _make_dict_items, _make_function, ) -if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY: +if pickle.HIGHEST_PROTOCOL >= 5: # Shorthands similar to pickle.dump/pickle.dumps def dump(obj, file, protocol=None, buffer_callback=None): @@ -123,7 +123,7 @@ def _class_getnewargs(obj): def _enum_getnewargs(obj): - members = dict((e.name, e.value) for e in obj) + members = {e.name: e.value for e in obj} return (obj.__bases__, obj.__name__, obj.__qualname__, members, obj.__module__, _get_or_create_tracker_id(obj), None) @@ -218,7 +218,7 @@ def _class_getstate(obj): def _enum_getstate(obj): clsdict, slotstate = _class_getstate(obj) - members = dict((e.name, e.value) for e in obj) + members = {e.name: e.value for e in obj} # Cleanup the clsdict that will be passed to _rehydrate_skeleton_class: # Those attributes are already handled by the metaclass. for attrname in ["_generate_next_value_", "_member_names_", @@ -244,7 +244,22 @@ def _enum_getstate(obj): def _code_reduce(obj): """codeobject reducer""" - if hasattr(obj, "co_linetable"): # pragma: no branch + # If you are not sure about the order of arguments, take a look at help + # of the specific type from types, for example: + # >>> from types import CodeType + # >>> help(CodeType) + if hasattr(obj, "co_exceptiontable"): # pragma: no branch + # Python 3.11 and later: there are some new attributes + # related to the enhanced exceptions. + args = ( + obj.co_argcount, obj.co_posonlyargcount, + obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, + obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, + obj.co_varnames, obj.co_filename, obj.co_name, obj.co_qualname, + obj.co_firstlineno, obj.co_linetable, obj.co_exceptiontable, + obj.co_freevars, obj.co_cellvars, + ) + elif hasattr(obj, "co_linetable"): # pragma: no branch # Python 3.10 and later: obj.co_lnotab is deprecated and constructor # expects obj.co_linetable instead. args = ( @@ -255,6 +270,18 @@ def _code_reduce(obj): obj.co_firstlineno, obj.co_linetable, obj.co_freevars, obj.co_cellvars ) + elif hasattr(obj, "co_nmeta"): # pragma: no cover + # "nogil" Python: modified attributes from 3.9 + args = ( + obj.co_argcount, obj.co_posonlyargcount, + obj.co_kwonlyargcount, obj.co_nlocals, obj.co_framesize, + obj.co_ndefaultargs, obj.co_nmeta, + obj.co_flags, obj.co_code, obj.co_consts, + obj.co_varnames, obj.co_filename, obj.co_name, + obj.co_firstlineno, obj.co_lnotab, obj.co_exc_handlers, + obj.co_jump_table, obj.co_freevars, obj.co_cellvars, + obj.co_free2reg, obj.co_cell2reg + ) elif hasattr(obj, "co_posonlyargcount"): # Backward compat for 3.9 and older args = ( @@ -534,7 +561,10 @@ class CloudPickler(Pickler): _dispatch_table[type(OrderedDict().keys())] = _odict_keys_reduce _dispatch_table[type(OrderedDict().values())] = _odict_values_reduce _dispatch_table[type(OrderedDict().items())] = _odict_items_reduce - + _dispatch_table[abc.abstractmethod] = _classmethod_reduce + _dispatch_table[abc.abstractclassmethod] = _classmethod_reduce + _dispatch_table[abc.abstractstaticmethod] = _classmethod_reduce + _dispatch_table[abc.abstractproperty] = _property_reduce dispatch_table = ChainMap(_dispatch_table, copyreg.dispatch_table) @@ -544,7 +574,7 @@ def _dynamic_function_reduce(self, func): """Reduce a function that is not pickleable via attribute lookup.""" newargs = self._function_getnewargs(func) state = _function_getstate(func) - return (types.FunctionType, newargs, state, None, None, + return (_make_function, newargs, state, None, None, _function_setstate) def _function_reduce(self, obj): @@ -611,6 +641,32 @@ def dump(self, obj): raise if pickle.HIGHEST_PROTOCOL >= 5: + def __init__(self, file, protocol=None, buffer_callback=None): + if protocol is None: + protocol = DEFAULT_PROTOCOL + Pickler.__init__( + self, file, protocol=protocol, buffer_callback=buffer_callback + ) + # map functions __globals__ attribute ids, to ensure that functions + # sharing the same global namespace at pickling time also share + # their global namespace at unpickling time. + self.globals_ref = {} + self.proto = int(protocol) + else: + def __init__(self, file, protocol=None): + if protocol is None: + protocol = DEFAULT_PROTOCOL + Pickler.__init__(self, file, protocol=protocol) + # map functions __globals__ attribute ids, to ensure that functions + # sharing the same global namespace at pickling time also share + # their global namespace at unpickling time. + self.globals_ref = {} + assert hasattr(self, 'proto') + + if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY: + # Pickler is the C implementation of the CPython pickler and therefore + # we rely on reduce_override method to customize the pickler behavior. + # `CloudPickler.dispatch` is only left for backward compatibility - note # that when using protocol 5, `CloudPickler.dispatch` is not an # extension of `Pickler.dispatch` dictionary, because CloudPickler @@ -631,17 +687,6 @@ def dump(self, obj): # availability of both notions coincide on CPython's pickle and the # pickle5 backport, but it may not be the case anymore when pypy # implements protocol 5 - def __init__(self, file, protocol=None, buffer_callback=None): - if protocol is None: - protocol = DEFAULT_PROTOCOL - Pickler.__init__( - self, file, protocol=protocol, buffer_callback=buffer_callback - ) - # map functions __globals__ attribute ids, to ensure that functions - # sharing the same global namespace at pickling time also share - # their global namespace at unpickling time. - self.globals_ref = {} - self.proto = int(protocol) def reducer_override(self, obj): """Type-agnostic reducing callback for function and classes. @@ -702,16 +747,6 @@ def reducer_override(self, obj): # hard-coded call to save_global when pickling meta-classes. dispatch = Pickler.dispatch.copy() - def __init__(self, file, protocol=None): - if protocol is None: - protocol = DEFAULT_PROTOCOL - Pickler.__init__(self, file, protocol=protocol) - # map functions __globals__ attribute ids, to ensure that functions - # sharing the same global namespace at pickling time also share - # their global namespace at unpickling time. - self.globals_ref = {} - assert hasattr(self, 'proto') - def _save_reduce_pickle5(self, func, args, state=None, listitems=None, dictitems=None, state_setter=None, obj=None): save = self.save