Skip to content

Commit c8d62ba

Browse files
committed
cleanup
1 parent f651fd0 commit c8d62ba

File tree

1 file changed

+9
-40
lines changed

1 file changed

+9
-40
lines changed

python/pyspark/cloudpickle.py

Lines changed: 9 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,11 @@
5252
import itertools
5353
from copy_reg import _extension_registry, _inverted_registry, _extension_cache
5454
import new
55-
import dis
5655
import traceback
5756
import platform
5857

5958
PyImp = platform.python_implementation()
6059

61-
#relevant opcodes
62-
STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL'))
63-
DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL'))
64-
LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL'))
65-
GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL]
66-
67-
HAVE_ARGUMENT = chr(dis.HAVE_ARGUMENT)
68-
EXTENDED_ARG = chr(dis.EXTENDED_ARG)
6960

7061
import logging
7162
cloudLog = logging.getLogger("Cloud.Transport")
@@ -313,31 +304,16 @@ def save_function_tuple(self, func, forced_imports):
313304
write(pickle.REDUCE) # applies _fill_function on the tuple
314305

315306
@staticmethod
316-
def extract_code_globals(co):
307+
def extract_code_globals(code):
317308
"""
318309
Find all globals names read or written to by codeblock co
319310
"""
320-
code = co.co_code
321-
names = co.co_names
322-
out_names = set()
323-
324-
n = len(code)
325-
i = 0
326-
extended_arg = 0
327-
while i < n:
328-
op = code[i]
329-
330-
i = i+1
331-
if op >= HAVE_ARGUMENT:
332-
oparg = ord(code[i]) + ord(code[i+1])*256 + extended_arg
333-
extended_arg = 0
334-
i = i+2
335-
if op == EXTENDED_ARG:
336-
extended_arg = oparg*65536L
337-
if op in GLOBAL_OPS:
338-
out_names.add(names[oparg])
339-
#print 'extracted', out_names, ' from ', names
340-
return out_names
311+
names = set(code.co_names)
312+
if code.co_consts: # see if nested function have any global refs
313+
for const in code.co_consts:
314+
if type(const) is types.CodeType:
315+
names |= CloudPickler.extract_code_globals(const)
316+
return names
341317

342318
def extract_func_data(self, func):
343319
"""
@@ -348,10 +324,7 @@ def extract_func_data(self, func):
348324

349325
# extract all global ref's
350326
func_global_refs = CloudPickler.extract_code_globals(code)
351-
if code.co_consts: # see if nested function have any global refs
352-
for const in code.co_consts:
353-
if type(const) is types.CodeType and const.co_names:
354-
func_global_refs = func_global_refs.union( CloudPickler.extract_code_globals(const))
327+
355328
# process all variables referenced by global environment
356329
f_globals = {}
357330
for var in func_global_refs:
@@ -663,7 +636,6 @@ def save_partial(self, obj):
663636
def save_file(self, obj):
664637
"""Save a file"""
665638
import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute
666-
from ..transport.adapter import SerializingAdapter
667639

668640
if not hasattr(obj, 'name') or not hasattr(obj, 'mode'):
669641
raise pickle.PicklingError("Cannot pickle files that do not map to an actual file")
@@ -697,13 +669,10 @@ def save_file(self, obj):
697669
tmpfile.close()
698670
if tst != '':
699671
raise pickle.PicklingError("Cannot pickle file %s as it does not appear to map to a physical, real file" % name)
700-
elif fsize > SerializingAdapter.max_transmit_data:
701-
raise pickle.PicklingError("Cannot pickle file %s as it exceeds cloudconf.py's max_transmit_data of %d" %
702-
(name,SerializingAdapter.max_transmit_data))
703672
else:
704673
try:
705674
tmpfile = file(name)
706-
contents = tmpfile.read(SerializingAdapter.max_transmit_data)
675+
contents = tmpfile.read()
707676
tmpfile.close()
708677
except IOError:
709678
raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name)

0 commit comments

Comments
 (0)