Skip to content

Commit 379d2c8

Browse files
author
Davies Liu
committed
fix zip with textFile()
1 parent 9bd9334 commit 379d2c8

File tree

2 files changed

+15
-12
lines changed

2 files changed

+15
-12
lines changed

python/pyspark/rdd.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1798,23 +1798,21 @@ def zip(self, other):
17981798
def get_batch_size(ser):
17991799
if isinstance(ser, BatchedSerializer):
18001800
return ser.batchSize
1801-
return 1
1801+
return 1 # not batched
18021802

18031803
def batch_as(rdd, batchSize):
1804-
ser = rdd._jrdd_deserializer
1805-
if isinstance(ser, BatchedSerializer):
1806-
ser = ser.serializer
1807-
return rdd._reserialize(BatchedSerializer(ser, batchSize))
1804+
return rdd._reserialize(BatchedSerializer(PickleSerializer(), batchSize))
18081805

18091806
my_batch = get_batch_size(self._jrdd_deserializer)
18101807
other_batch = get_batch_size(other._jrdd_deserializer)
1811-
# use the smallest batchSize for both of them
1812-
batchSize = min(my_batch, other_batch)
1813-
if batchSize <= 0:
1814-
# auto batched or unlimited
1815-
batchSize = 100
1816-
other = batch_as(other, batchSize)
1817-
self = batch_as(self, batchSize)
1808+
if my_batch != other_batch:
1809+
# use the smallest batchSize for both of them
1810+
batchSize = min(my_batch, other_batch)
1811+
if batchSize <= 0:
1812+
# auto batched or unlimited
1813+
batchSize = 100
1814+
other = batch_as(other, batchSize)
1815+
self = batch_as(self, batchSize)
18181816

18191817
if self.getNumPartitions() != other.getNumPartitions():
18201818
raise ValueError("Can only zip with RDD which has the same number of partitions")

python/pyspark/tests.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,11 @@ def test_zip_with_different_serializers(self):
533533
a = a._reserialize(BatchedSerializer(PickleSerializer(), 2))
534534
b = b._reserialize(MarshalSerializer())
535535
self.assertEqual(a.zip(b).collect(), [(0, 100), (1, 101), (2, 102), (3, 103), (4, 104)])
536+
path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
537+
t = self.sc.textFile(path)
538+
cnt = t.count()
539+
self.assertEqual(cnt, t.zip(t).count())
540+
self.assertEqual(cnt, t.zip(t.map(str)).count())
536541

537542
def test_zip_with_different_number_of_items(self):
538543
a = self.sc.parallelize(range(5), 2)

0 commit comments

Comments
 (0)