Skip to content

Commit 20ce3a3

Browse files
author
Davies Liu
committed
fix bug in _reserialize()
1 parent e3ebf7c commit 20ce3a3

File tree

3 files changed

+11
-3
lines changed

3 files changed

+11
-3
lines changed

python/pyspark/rdd.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,7 @@ def intersection(self, other):
469469
def _reserialize(self, serializer=None):
470470
serializer = serializer or self.ctx.serializer
471471
if self._jrdd_deserializer != serializer:
472-
if not isinstance(self, PipelinedRDD):
473-
self = self.map(lambda x: x, preservesPartitioning=True)
472+
self = self.map(lambda x: x, preservesPartitioning=True)
474473
self._jrdd_deserializer = serializer
475474
return self
476475

python/pyspark/serializers.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,9 @@ def dumps(self, obj):
463463
def loads(self, obj):
464464
return self.serializer.loads(zlib.decompress(obj))
465465

466+
def __eq__(self, other):
467+
return isinstance(other, CompressedSerializer) and self.serializer == other.serializer
468+
466469

467470
class UTF8Deserializer(Serializer):
468471

@@ -489,6 +492,9 @@ def load_stream(self, stream):
489492
except EOFError:
490493
return
491494

495+
def __eq__(self, other):
496+
return isinstance(other, UTF8Deserializer) and self.use_unicode == other.use_unicode
497+
492498

493499
def read_long(stream):
494500
length = stream.read(8)

python/pyspark/tests.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,10 @@ def test_zip_with_different_serializers(self):
538538
t = self.sc.textFile(path)
539539
cnt = t.count()
540540
self.assertEqual(cnt, t.zip(t).count())
541-
self.assertEqual(cnt, t.zip(t.map(str)).count())
541+
rdd = t.map(str)
542+
self.assertEqual(cnt, t.zip(rdd).count())
543+
# regression test for bug in _reserializer()
544+
self.assertEqual(cnt, t.zip(rdd).count())
542545

543546
def test_zip_with_different_number_of_items(self):
544547
a = self.sc.parallelize(range(5), 2)

0 commit comments

Comments
 (0)