Skip to content

Commit 71af030

Browse files
daviesJoshRosen
authored andcommitted
[SPARK-3094] [PySpark] compatitable with PyPy
After this patch, we can run PySpark in PyPy (testing with PyPy 2.3.1 in Mac 10.9), for example: ``` PYSPARK_PYTHON=pypy ./bin/spark-submit wordcount.py ``` The performance speed up will depend on work load (from 20% to 3000%). Here are some benchmarks: Job | CPython 2.7 | PyPy 2.3.1 | Speed up ------- | ------------ | ------------- | ------- Word Count | 41s | 15s | 2.7x Sort | 46s | 44s | 1.05x Stats | 174s | 3.6s | 48x Here is the code used for benchmark: ```python rdd = sc.textFile("text") def wordcount(): rdd.flatMap(lambda x:x.split('/'))\ .map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collectAsMap() def sort(): rdd.sortBy(lambda x:x, 1).count() def stats(): sc.parallelize(range(1024), 20).flatMap(lambda x: xrange(5024)).stats() ``` Author: Davies Liu <[email protected]> Closes #2144 from davies/pypy and squashes the following commits: 9aed6c5 [Davies Liu] use protocol 2 in CloudPickle 4bc1f04 [Davies Liu] refactor b20ab3a [Davies Liu] pickle sys.stdout and stderr in portable way 3ca2351 [Davies Liu] Merge branch 'master' into pypy fae8b19 [Davies Liu] improve attrgetter, add tests 591f830 [Davies Liu] try to run tests with PyPy in run-tests c8d62ba [Davies Liu] cleanup f651fd0 [Davies Liu] fix tests using array with PyPy 1b98fb3 [Davies Liu] serialize itemgetter/attrgetter in portable ways 3c1dbfe [Davies Liu] Merge branch 'master' into pypy 42fb5fa [Davies Liu] Merge branch 'master' into pypy cb2d724 [Davies Liu] fix tests 9986692 [Davies Liu] Merge branch 'master' into pypy 25b4ca7 [Davies Liu] support PyPy
1 parent 25311c2 commit 71af030

File tree

5 files changed

+172
-118
lines changed

5 files changed

+172
-118
lines changed

python/pyspark/cloudpickle.py

Lines changed: 67 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -52,35 +52,19 @@
5252
import itertools
5353
from copy_reg import _extension_registry, _inverted_registry, _extension_cache
5454
import new
55-
import dis
5655
import traceback
56+
import platform
5757

58-
#relevant opcodes
59-
STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL'))
60-
DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL'))
61-
LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL'))
62-
GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL]
58+
PyImp = platform.python_implementation()
6359

64-
HAVE_ARGUMENT = chr(dis.HAVE_ARGUMENT)
65-
EXTENDED_ARG = chr(dis.EXTENDED_ARG)
6660

6761
import logging
6862
cloudLog = logging.getLogger("Cloud.Transport")
6963

70-
try:
71-
import ctypes
72-
except (MemoryError, ImportError):
73-
logging.warning('Exception raised on importing ctypes. Likely python bug.. some functionality will be disabled', exc_info = True)
74-
ctypes = None
75-
PyObject_HEAD = None
76-
else:
77-
78-
# for reading internal structures
79-
PyObject_HEAD = [
80-
('ob_refcnt', ctypes.c_size_t),
81-
('ob_type', ctypes.c_void_p),
82-
]
8364

65+
if PyImp == "PyPy":
66+
# register builtin type in `new`
67+
new.method = types.MethodType
8468

8569
try:
8670
from cStringIO import StringIO
@@ -225,6 +209,8 @@ def save_function(self, obj, name=None, pack=struct.pack):
225209

226210
if themodule:
227211
self.modules.add(themodule)
212+
if getattr(themodule, name, None) is obj:
213+
return self.save_global(obj, name)
228214

229215
if not self.savedDjangoEnv:
230216
#hack for django - if we detect the settings module, we transport it
@@ -306,44 +292,28 @@ def save_function_tuple(self, func, forced_imports):
306292

307293
# create a skeleton function object and memoize it
308294
save(_make_skel_func)
309-
save((code, len(closure), base_globals))
295+
save((code, closure, base_globals))
310296
write(pickle.REDUCE)
311297
self.memoize(func)
312298

313299
# save the rest of the func data needed by _fill_function
314300
save(f_globals)
315301
save(defaults)
316-
save(closure)
317302
save(dct)
318303
write(pickle.TUPLE)
319304
write(pickle.REDUCE) # applies _fill_function on the tuple
320305

321306
@staticmethod
322-
def extract_code_globals(co):
307+
def extract_code_globals(code):
323308
"""
324309
Find all globals names read or written to by codeblock co
325310
"""
326-
code = co.co_code
327-
names = co.co_names
328-
out_names = set()
329-
330-
n = len(code)
331-
i = 0
332-
extended_arg = 0
333-
while i < n:
334-
op = code[i]
335-
336-
i = i+1
337-
if op >= HAVE_ARGUMENT:
338-
oparg = ord(code[i]) + ord(code[i+1])*256 + extended_arg
339-
extended_arg = 0
340-
i = i+2
341-
if op == EXTENDED_ARG:
342-
extended_arg = oparg*65536L
343-
if op in GLOBAL_OPS:
344-
out_names.add(names[oparg])
345-
#print 'extracted', out_names, ' from ', names
346-
return out_names
311+
names = set(code.co_names)
312+
if code.co_consts: # see if nested function have any global refs
313+
for const in code.co_consts:
314+
if type(const) is types.CodeType:
315+
names |= CloudPickler.extract_code_globals(const)
316+
return names
347317

348318
def extract_func_data(self, func):
349319
"""
@@ -354,10 +324,7 @@ def extract_func_data(self, func):
354324

355325
# extract all global ref's
356326
func_global_refs = CloudPickler.extract_code_globals(code)
357-
if code.co_consts: # see if nested function have any global refs
358-
for const in code.co_consts:
359-
if type(const) is types.CodeType and const.co_names:
360-
func_global_refs = func_global_refs.union( CloudPickler.extract_code_globals(const))
327+
361328
# process all variables referenced by global environment
362329
f_globals = {}
363330
for var in func_global_refs:
@@ -396,6 +363,12 @@ def get_contents(cell):
396363

397364
return (code, f_globals, defaults, closure, dct, base_globals)
398365

366+
def save_builtin_function(self, obj):
367+
if obj.__module__ is "__builtin__":
368+
return self.save_global(obj)
369+
return self.save_function(obj)
370+
dispatch[types.BuiltinFunctionType] = save_builtin_function
371+
399372
def save_global(self, obj, name=None, pack=struct.pack):
400373
write = self.write
401374
memo = self.memo
@@ -435,7 +408,7 @@ def save_global(self, obj, name=None, pack=struct.pack):
435408
try:
436409
klass = getattr(themodule, name)
437410
except AttributeError, a:
438-
#print themodule, name, obj, type(obj)
411+
# print themodule, name, obj, type(obj)
439412
raise pickle.PicklingError("Can't pickle builtin %s" % obj)
440413
else:
441414
raise
@@ -480,7 +453,6 @@ def save_global(self, obj, name=None, pack=struct.pack):
480453
write(pickle.GLOBAL + modname + '\n' + name + '\n')
481454
self.memoize(obj)
482455
dispatch[types.ClassType] = save_global
483-
dispatch[types.BuiltinFunctionType] = save_global
484456
dispatch[types.TypeType] = save_global
485457

486458
def save_instancemethod(self, obj):
@@ -551,23 +523,39 @@ def save_property(self, obj):
551523
dispatch[property] = save_property
552524

553525
def save_itemgetter(self, obj):
554-
"""itemgetter serializer (needed for namedtuple support)
555-
a bit of a pain as we need to read ctypes internals"""
556-
class ItemGetterType(ctypes.Structure):
557-
_fields_ = PyObject_HEAD + [
558-
('nitems', ctypes.c_size_t),
559-
('item', ctypes.py_object)
560-
]
561-
562-
563-
obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents
564-
return self.save_reduce(operator.itemgetter,
565-
obj.item if obj.nitems > 1 else (obj.item,))
566-
567-
if PyObject_HEAD:
526+
"""itemgetter serializer (needed for namedtuple support)"""
527+
class Dummy:
528+
def __getitem__(self, item):
529+
return item
530+
items = obj(Dummy())
531+
if not isinstance(items, tuple):
532+
items = (items, )
533+
return self.save_reduce(operator.itemgetter, items)
534+
535+
if type(operator.itemgetter) is type:
568536
dispatch[operator.itemgetter] = save_itemgetter
569537

538+
def save_attrgetter(self, obj):
539+
"""attrgetter serializer"""
540+
class Dummy(object):
541+
def __init__(self, attrs, index=None):
542+
self.attrs = attrs
543+
self.index = index
544+
def __getattribute__(self, item):
545+
attrs = object.__getattribute__(self, "attrs")
546+
index = object.__getattribute__(self, "index")
547+
if index is None:
548+
index = len(attrs)
549+
attrs.append(item)
550+
else:
551+
attrs[index] = ".".join([attrs[index], item])
552+
return type(self)(attrs, index)
553+
attrs = []
554+
obj(Dummy(attrs))
555+
return self.save_reduce(operator.attrgetter, tuple(attrs))
570556

557+
if type(operator.attrgetter) is type:
558+
dispatch[operator.attrgetter] = save_attrgetter
571559

572560
def save_reduce(self, func, args, state=None,
573561
listitems=None, dictitems=None, obj=None):
@@ -660,11 +648,11 @@ def save_file(self, obj):
660648

661649
if not hasattr(obj, 'name') or not hasattr(obj, 'mode'):
662650
raise pickle.PicklingError("Cannot pickle files that do not map to an actual file")
663-
if obj.name == '<stdout>':
651+
if obj is sys.stdout:
664652
return self.save_reduce(getattr, (sys,'stdout'), obj=obj)
665-
if obj.name == '<stderr>':
653+
if obj is sys.stderr:
666654
return self.save_reduce(getattr, (sys,'stderr'), obj=obj)
667-
if obj.name == '<stdin>':
655+
if obj is sys.stdin:
668656
raise pickle.PicklingError("Cannot pickle standard input")
669657
if hasattr(obj, 'isatty') and obj.isatty():
670658
raise pickle.PicklingError("Cannot pickle files that map to tty objects")
@@ -873,58 +861,36 @@ def _genpartial(func, args, kwds):
873861
kwds = {}
874862
return partial(func, *args, **kwds)
875863

876-
877-
def _fill_function(func, globals, defaults, closure, dict):
864+
def _fill_function(func, globals, defaults, dict):
878865
""" Fills in the rest of function data into the skeleton function object
879866
that were created via _make_skel_func().
880867
"""
881868
func.func_globals.update(globals)
882869
func.func_defaults = defaults
883870
func.func_dict = dict
884871

885-
if len(closure) != len(func.func_closure):
886-
raise pickle.UnpicklingError("closure lengths don't match up")
887-
for i in range(len(closure)):
888-
_change_cell_value(func.func_closure[i], closure[i])
889-
890872
return func
891873

892-
def _make_skel_func(code, num_closures, base_globals = None):
874+
def _make_cell(value):
875+
return (lambda: value).func_closure[0]
876+
877+
def _reconstruct_closure(values):
878+
return tuple([_make_cell(v) for v in values])
879+
880+
def _make_skel_func(code, closures, base_globals = None):
893881
""" Creates a skeleton function object that contains just the provided
894882
code and the correct number of cells in func_closure. All other
895883
func attributes (e.g. func_globals) are empty.
896884
"""
897-
#build closure (cells):
898-
if not ctypes:
899-
raise Exception('ctypes failed to import; cannot build function')
900-
901-
cellnew = ctypes.pythonapi.PyCell_New
902-
cellnew.restype = ctypes.py_object
903-
cellnew.argtypes = (ctypes.py_object,)
904-
dummy_closure = tuple(map(lambda i: cellnew(None), range(num_closures)))
885+
closure = _reconstruct_closure(closures) if closures else None
905886

906887
if base_globals is None:
907888
base_globals = {}
908889
base_globals['__builtins__'] = __builtins__
909890

910891
return types.FunctionType(code, base_globals,
911-
None, None, dummy_closure)
912-
913-
# this piece of opaque code is needed below to modify 'cell' contents
914-
cell_changer_code = new.code(
915-
1, 1, 2, 0,
916-
''.join([
917-
chr(dis.opmap['LOAD_FAST']), '\x00\x00',
918-
chr(dis.opmap['DUP_TOP']),
919-
chr(dis.opmap['STORE_DEREF']), '\x00\x00',
920-
chr(dis.opmap['RETURN_VALUE'])
921-
]),
922-
(), (), ('newval',), '<nowhere>', 'cell_changer', 1, '', ('c',), ()
923-
)
924-
925-
def _change_cell_value(cell, newval):
926-
""" Changes the contents of 'cell' object to newval """
927-
return new.function(cell_changer_code, {}, None, (), (cell,))(newval)
892+
None, None, closure)
893+
928894

929895
"""Constructors for 3rd party libraries
930896
Note: These can never be renamed due to client compatibility issues"""

python/pyspark/daemon.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,6 @@ def worker(sock):
4242
"""
4343
Called by a worker process after the fork().
4444
"""
45-
# Redirect stdout to stderr
46-
os.dup2(2, 1)
47-
sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1
48-
4945
signal.signal(SIGHUP, SIG_DFL)
5046
signal.signal(SIGCHLD, SIG_DFL)
5147
signal.signal(SIGTERM, SIG_DFL)
@@ -102,6 +98,7 @@ def manager():
10298
listen_sock.listen(max(1024, SOMAXCONN))
10399
listen_host, listen_port = listen_sock.getsockname()
104100
write_int(listen_port, sys.stdout)
101+
sys.stdout.flush()
105102

106103
def shutdown(code):
107104
signal.signal(SIGTERM, SIG_DFL)
@@ -115,7 +112,6 @@ def handle_sigterm(*args):
115112
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
116113

117114
# Initialization complete
118-
sys.stdout.close()
119115
try:
120116
while True:
121117
try:

python/pyspark/serializers.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,8 @@ class PickleSerializer(FramedSerializer):
355355
def dumps(self, obj):
356356
return cPickle.dumps(obj, 2)
357357

358-
loads = cPickle.loads
358+
def loads(self, obj):
359+
return cPickle.loads(obj)
359360

360361

361362
class CloudPickleSerializer(PickleSerializer):
@@ -374,8 +375,11 @@ class MarshalSerializer(FramedSerializer):
374375
This serializer is faster than PickleSerializer but supports fewer datatypes.
375376
"""
376377

377-
dumps = marshal.dumps
378-
loads = marshal.loads
378+
def dumps(self, obj):
379+
return marshal.dumps(obj)
380+
381+
def loads(self, obj):
382+
return marshal.loads(obj)
379383

380384

381385
class AutoSerializer(FramedSerializer):

0 commit comments

Comments
 (0)