Skip to content

Commit a0295e1

Browse files
committed
add an option to use str in textFile()
str is much efficient than unicode
1 parent 13f54e2 commit a0295e1

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

python/pyspark/context.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ def pickleFile(self, name, minPartitions=None):
314314
return RDD(self._jsc.objectFile(name, minPartitions), self,
315315
BatchedSerializer(PickleSerializer()))
316316

317-
def textFile(self, name, minPartitions=None):
317+
def textFile(self, name, minPartitions=None, use_unicode=True):
318318
"""
319319
Read a text file from HDFS, a local file system (available on all
320320
nodes), or any Hadoop-supported file system URI, and return it as an
@@ -329,9 +329,9 @@ def textFile(self, name, minPartitions=None):
329329
"""
330330
minPartitions = minPartitions or min(self.defaultParallelism, 2)
331331
return RDD(self._jsc.textFile(name, minPartitions), self,
332-
UTF8Deserializer())
332+
UTF8Deserializer(use_unicode))
333333

334-
def wholeTextFiles(self, path, minPartitions=None):
334+
def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
335335
"""
336336
Read a directory of text files from HDFS, a local file system
337337
(available on all nodes), or any Hadoop-supported file system
@@ -369,7 +369,7 @@ def wholeTextFiles(self, path, minPartitions=None):
369369
"""
370370
minPartitions = minPartitions or self.defaultMinPartitions
371371
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
372-
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
372+
PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))
373373

374374
def _dictToJavaMap(self, d):
375375
jm = self._jvm.java.util.HashMap()

python/pyspark/serializers.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -409,18 +409,25 @@ class UTF8Deserializer(Serializer):
409409
Deserializes streams written by String.getBytes.
410410
"""
411411

412+
def __init__(self, use_unicode=False):
413+
self.use_unicode = use_unicode
414+
412415
def loads(self, stream):
413416
length = read_int(stream)
414-
return stream.read(length).decode('utf8')
417+
return stream.read(length)
415418

416419
def load_stream(self, stream):
417-
while True:
418-
try:
419-
yield self.loads(stream)
420-
except struct.error:
421-
return
422-
except EOFError:
423-
return
420+
try:
421+
if self.use_unicode:
422+
while True:
423+
yield self.loads(stream).decode("utf-8")
424+
else:
425+
while True:
426+
yield self.loads(stream)
427+
except struct.error:
428+
return
429+
except EOFError:
430+
return
424431

425432

426433
def read_long(stream):

0 commit comments

Comments
 (0)