Skip to content
168 changes: 67 additions & 101 deletions python/pyspark/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,35 +52,19 @@
import itertools
from copy_reg import _extension_registry, _inverted_registry, _extension_cache
import new
import dis
import traceback
import platform

#relevant opcodes
STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL'))
DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL'))
LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL'))
GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL]
PyImp = platform.python_implementation()

HAVE_ARGUMENT = chr(dis.HAVE_ARGUMENT)
EXTENDED_ARG = chr(dis.EXTENDED_ARG)

import logging
cloudLog = logging.getLogger("Cloud.Transport")

try:
import ctypes
except (MemoryError, ImportError):
logging.warning('Exception raised on importing ctypes. Likely python bug.. some functionality will be disabled', exc_info = True)
ctypes = None
PyObject_HEAD = None
else:

# for reading internal structures
PyObject_HEAD = [
('ob_refcnt', ctypes.c_size_t),
('ob_type', ctypes.c_void_p),
]

if PyImp == "PyPy":
# register builtin type in `new`
new.method = types.MethodType

try:
from cStringIO import StringIO
Expand Down Expand Up @@ -225,6 +209,8 @@ def save_function(self, obj, name=None, pack=struct.pack):

if themodule:
self.modules.add(themodule)
if getattr(themodule, name, None) is obj:
return self.save_global(obj, name)

if not self.savedDjangoEnv:
#hack for django - if we detect the settings module, we transport it
Expand Down Expand Up @@ -306,44 +292,28 @@ def save_function_tuple(self, func, forced_imports):

# create a skeleton function object and memoize it
save(_make_skel_func)
save((code, len(closure), base_globals))
save((code, closure, base_globals))
write(pickle.REDUCE)
self.memoize(func)

# save the rest of the func data needed by _fill_function
save(f_globals)
save(defaults)
save(closure)
save(dct)
write(pickle.TUPLE)
write(pickle.REDUCE) # applies _fill_function on the tuple

@staticmethod
def extract_code_globals(co):
def extract_code_globals(code):
"""
Find all globals names read or written to by codeblock co
"""
code = co.co_code
names = co.co_names
out_names = set()

n = len(code)
i = 0
extended_arg = 0
while i < n:
op = code[i]

i = i+1
if op >= HAVE_ARGUMENT:
oparg = ord(code[i]) + ord(code[i+1])*256 + extended_arg
extended_arg = 0
i = i+2
if op == EXTENDED_ARG:
extended_arg = oparg*65536L
if op in GLOBAL_OPS:
out_names.add(names[oparg])
#print 'extracted', out_names, ' from ', names
return out_names
names = set(code.co_names)
if code.co_consts: # see if nested function have any global refs
for const in code.co_consts:
if type(const) is types.CodeType:
names |= CloudPickler.extract_code_globals(const)
return names

def extract_func_data(self, func):
"""
Expand All @@ -354,10 +324,7 @@ def extract_func_data(self, func):

# extract all global ref's
func_global_refs = CloudPickler.extract_code_globals(code)
if code.co_consts: # see if nested function have any global refs
for const in code.co_consts:
if type(const) is types.CodeType and const.co_names:
func_global_refs = func_global_refs.union( CloudPickler.extract_code_globals(const))

# process all variables referenced by global environment
f_globals = {}
for var in func_global_refs:
Expand Down Expand Up @@ -396,6 +363,12 @@ def get_contents(cell):

return (code, f_globals, defaults, closure, dct, base_globals)

def save_builtin_function(self, obj):
if obj.__module__ is "__builtin__":
return self.save_global(obj)
return self.save_function(obj)
dispatch[types.BuiltinFunctionType] = save_builtin_function

def save_global(self, obj, name=None, pack=struct.pack):
write = self.write
memo = self.memo
Expand Down Expand Up @@ -435,7 +408,7 @@ def save_global(self, obj, name=None, pack=struct.pack):
try:
klass = getattr(themodule, name)
except AttributeError, a:
#print themodule, name, obj, type(obj)
# print themodule, name, obj, type(obj)
raise pickle.PicklingError("Can't pickle builtin %s" % obj)
else:
raise
Expand Down Expand Up @@ -480,7 +453,6 @@ def save_global(self, obj, name=None, pack=struct.pack):
write(pickle.GLOBAL + modname + '\n' + name + '\n')
self.memoize(obj)
dispatch[types.ClassType] = save_global
dispatch[types.BuiltinFunctionType] = save_global
dispatch[types.TypeType] = save_global

def save_instancemethod(self, obj):
Expand Down Expand Up @@ -551,23 +523,39 @@ def save_property(self, obj):
dispatch[property] = save_property

def save_itemgetter(self, obj):
"""itemgetter serializer (needed for namedtuple support)
a bit of a pain as we need to read ctypes internals"""
class ItemGetterType(ctypes.Structure):
_fields_ = PyObject_HEAD + [
('nitems', ctypes.c_size_t),
('item', ctypes.py_object)
]


obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents
return self.save_reduce(operator.itemgetter,
obj.item if obj.nitems > 1 else (obj.item,))

if PyObject_HEAD:
"""itemgetter serializer (needed for namedtuple support)"""
class Dummy:
def __getitem__(self, item):
return item
items = obj(Dummy())
if not isinstance(items, tuple):
items = (items, )
return self.save_reduce(operator.itemgetter, items)

if type(operator.itemgetter) is type:
dispatch[operator.itemgetter] = save_itemgetter

def save_attrgetter(self, obj):
"""attrgetter serializer"""
class Dummy(object):
def __init__(self, attrs, index=None):
self.attrs = attrs
self.index = index
def __getattribute__(self, item):
attrs = object.__getattribute__(self, "attrs")
index = object.__getattribute__(self, "index")
if index is None:
index = len(attrs)
attrs.append(item)
else:
attrs[index] = ".".join([attrs[index], item])
return type(self)(attrs, index)
attrs = []
obj(Dummy(attrs))
return self.save_reduce(operator.attrgetter, tuple(attrs))

if type(operator.attrgetter) is type:
dispatch[operator.attrgetter] = save_attrgetter

def save_reduce(self, func, args, state=None,
listitems=None, dictitems=None, obj=None):
Expand Down Expand Up @@ -660,11 +648,11 @@ def save_file(self, obj):

if not hasattr(obj, 'name') or not hasattr(obj, 'mode'):
raise pickle.PicklingError("Cannot pickle files that do not map to an actual file")
if obj.name == '<stdout>':
if obj is sys.stdout:
return self.save_reduce(getattr, (sys,'stdout'), obj=obj)
if obj.name == '<stderr>':
if obj is sys.stderr:
return self.save_reduce(getattr, (sys,'stderr'), obj=obj)
if obj.name == '<stdin>':
if obj is sys.stdin:
raise pickle.PicklingError("Cannot pickle standard input")
if hasattr(obj, 'isatty') and obj.isatty():
raise pickle.PicklingError("Cannot pickle files that map to tty objects")
Expand Down Expand Up @@ -873,58 +861,36 @@ def _genpartial(func, args, kwds):
kwds = {}
return partial(func, *args, **kwds)


def _fill_function(func, globals, defaults, closure, dict):
def _fill_function(func, globals, defaults, dict):
""" Fills in the rest of function data into the skeleton function object
that were created via _make_skel_func().
"""
func.func_globals.update(globals)
func.func_defaults = defaults
func.func_dict = dict

if len(closure) != len(func.func_closure):
raise pickle.UnpicklingError("closure lengths don't match up")
for i in range(len(closure)):
_change_cell_value(func.func_closure[i], closure[i])

return func

def _make_skel_func(code, num_closures, base_globals = None):
def _make_cell(value):
return (lambda: value).func_closure[0]

def _reconstruct_closure(values):
return tuple([_make_cell(v) for v in values])

def _make_skel_func(code, closures, base_globals = None):
""" Creates a skeleton function object that contains just the provided
code and the correct number of cells in func_closure. All other
func attributes (e.g. func_globals) are empty.
"""
#build closure (cells):
if not ctypes:
raise Exception('ctypes failed to import; cannot build function')

cellnew = ctypes.pythonapi.PyCell_New
cellnew.restype = ctypes.py_object
cellnew.argtypes = (ctypes.py_object,)
dummy_closure = tuple(map(lambda i: cellnew(None), range(num_closures)))
closure = _reconstruct_closure(closures) if closures else None

if base_globals is None:
base_globals = {}
base_globals['__builtins__'] = __builtins__

return types.FunctionType(code, base_globals,
None, None, dummy_closure)

# this piece of opaque code is needed below to modify 'cell' contents
cell_changer_code = new.code(
1, 1, 2, 0,
''.join([
chr(dis.opmap['LOAD_FAST']), '\x00\x00',
chr(dis.opmap['DUP_TOP']),
chr(dis.opmap['STORE_DEREF']), '\x00\x00',
chr(dis.opmap['RETURN_VALUE'])
]),
(), (), ('newval',), '<nowhere>', 'cell_changer', 1, '', ('c',), ()
)

def _change_cell_value(cell, newval):
""" Changes the contents of 'cell' object to newval """
return new.function(cell_changer_code, {}, None, (), (cell,))(newval)
None, None, closure)


"""Constructors for 3rd party libraries
Note: These can never be renamed due to client compatibility issues"""
Expand Down
6 changes: 1 addition & 5 deletions python/pyspark/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ def worker(sock):
"""
Called by a worker process after the fork().
"""
# Redirect stdout to stderr
os.dup2(2, 1)
sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1

signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
Expand Down Expand Up @@ -102,6 +98,7 @@ def manager():
listen_sock.listen(max(1024, SOMAXCONN))
listen_host, listen_port = listen_sock.getsockname()
write_int(listen_port, sys.stdout)
sys.stdout.flush()

def shutdown(code):
signal.signal(SIGTERM, SIG_DFL)
Expand All @@ -115,7 +112,6 @@ def handle_sigterm(*args):
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP

# Initialization complete
sys.stdout.close()
try:
while True:
try:
Expand Down
10 changes: 7 additions & 3 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ class PickleSerializer(FramedSerializer):
def dumps(self, obj):
return cPickle.dumps(obj, 2)

loads = cPickle.loads
def loads(self, obj):
return cPickle.loads(obj)


class CloudPickleSerializer(PickleSerializer):
Expand All @@ -374,8 +375,11 @@ class MarshalSerializer(FramedSerializer):
This serializer is faster than PickleSerializer but supports fewer datatypes.
"""

dumps = marshal.dumps
loads = marshal.loads
def dumps(self, obj):
return marshal.dumps(obj)

def loads(self, obj):
return marshal.loads(obj)


class AutoSerializer(FramedSerializer):
Expand Down
Loading