Skip to content

Commit c79ca67

Browse files
committed
fix serialization of nested data
1 parent 6b258b5 commit c79ca67

File tree

1 file changed

+29
-16
lines changed

1 file changed

+29
-16
lines changed

python/pyspark/sql.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import decimal
2525
import datetime
2626
from operator import itemgetter
27+
import keyword
2728
import warnings
2829

2930
from pyspark.rdd import RDD, PipelinedRDD
@@ -659,16 +660,15 @@ def _infer_schema_type(obj, dataType):
659660
_cached_cls = {}
660661

661662

662-
def _restore_object(fields, obj):
663+
def _restore_object(dataType, obj):
663664
""" Restore object during unpickling. """
664-
cls = _cached_cls.get(fields)
665+
# use id(dataType) as key to speed up lookup in dict
666+
# Because of batched pickling, dataType will be the
667+
# same object in mose cases.
668+
cls = _cached_cls.get(id(dataType))
665669
if cls is None:
666-
# create a mock StructType, because nested StructType will
667-
# be restored by itself
668-
fs = [StructField(n, StringType, True) for n in fields]
669-
dataType = StructType(fs)
670670
cls = _create_cls(dataType)
671-
_cached_cls[fields] = cls
671+
_cached_cls[id(dataType)] = cls
672672
return cls(obj)
673673

674674

@@ -703,10 +703,10 @@ def _create_properties(fields):
703703
ps = {}
704704
for i, f in enumerate(fields):
705705
name = f.name
706-
if name.startswith("__") and name.endswith("__"):
706+
if (name.startswith("__") and name.endswith("__")
707+
or keyword.iskeyword(name)):
707708
warnings.warn("field name %s can not be accessed in Python,"
708-
"use position to access instead" % name)
709-
continue
709+
"use position to access it instead" % name)
710710
if _has_struct(f.dataType):
711711
# delay creating object until accessing it
712712
getter = _create_getter(f.dataType, i)
@@ -721,6 +721,21 @@ def _create_cls(dataType):
721721
Create an class by dataType
722722
723723
The created class is similar to namedtuple, but can have nested schema.
724+
725+
>>> schema = _parse_schema_abstract("a b c")
726+
>>> row = (1, 1.0, "str")
727+
>>> schema = _infer_schema_type(row, schema)
728+
>>> obj = _create_cls(schema)(row)
729+
>>> import pickle
730+
>>> pickle.loads(pickle.dumps(obj))
731+
Row(a=1, b=1.0, c='str')
732+
733+
>>> row = [[1], {"key": (1, 2.0)}]
734+
>>> schema = _parse_schema_abstract("a[] b{c d}")
735+
>>> schema = _infer_schema_type(row, schema)
736+
>>> obj = _create_cls(schema)(row)
737+
>>> pickle.loads(pickle.dumps(obj))
738+
Row(a=[1], b={'key': Row(c=1, d=2.0)})
724739
"""
725740

726741
if isinstance(dataType, ArrayType):
@@ -737,9 +752,8 @@ def __repr__(self):
737752
return "[%s]" % (", ".join(repr(self[i])
738753
for i in range(len(self))))
739754

740-
# pickle as dict, the nested struct can be reduced by itself
741755
def __reduce__(self):
742-
return (list, (list(self),))
756+
return list.__reduce__(self)
743757

744758
return List
745759

@@ -757,9 +771,8 @@ def __repr__(self):
757771
return "{%s}" % (", ".join("%r: %r" % (k, self[k])
758772
for k in self))
759773

760-
# pickle as dict, the nested struct can be reduced by itself
761774
def __reduce__(self):
762-
return (dict, (dict(self),))
775+
return dict.__reduce__(self)
763776

764777
return Dict
765778

@@ -768,7 +781,7 @@ def __reduce__(self):
768781

769782
class Row(tuple):
770783
""" Row in SchemaRDD """
771-
784+
__DATATYPE__ = dataType
772785
__FIELDS__ = tuple(f.name for f in dataType.fields)
773786

774787
# create property for fast access
@@ -780,7 +793,7 @@ def __repr__(self):
780793
for n in self.__FIELDS__))
781794

782795
def __reduce__(self):
783-
return (_restore_object, (self.__FIELDS__, tuple(self)))
796+
return (_restore_object, (self.__DATATYPE__, tuple(self)))
784797

785798
return Row
786799

0 commit comments

Comments
 (0)