Skip to content

Commit a84d6af

Browse files
committed
SPARK-1421. Make MLlib work on Python 2.6
The reason it wasn't working was passing a bytearray to stream.write(), which is not supported in Python 2.6 but is in 2.7. (This array came from NumPy when we converted data to send it over to Java). Now we just convert those bytearrays to strings of bytes, which preserves nonprintable characters as well.
1 parent 2d0150c commit a84d6af

File tree

2 files changed

+11
-6
lines changed

2 files changed

+11
-6
lines changed

python/pyspark/mllib/__init__.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,7 @@
1919
Python bindings for MLlib.
2020
"""
2121

22-
# MLlib currently needs Python 2.7+ and NumPy 1.7+, so complain if lower
23-
24-
import sys
25-
if sys.version_info[0:2] < (2, 7):
26-
raise Exception("MLlib requires Python 2.7+")
22+
# MLlib currently needs and NumPy 1.7+, so complain if lower
2723

2824
import numpy
2925
if numpy.version.version < '1.7':

python/pyspark/serializers.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
from itertools import chain, izip, product
6565
import marshal
6666
import struct
67+
import sys
6768
from pyspark import cloudpickle
6869

6970

@@ -113,6 +114,11 @@ class FramedSerializer(Serializer):
113114
where C{length} is a 32-bit integer and data is C{length} bytes.
114115
"""
115116

117+
def __init__(self):
118+
# On Python 2.6, we can't write bytearrays to streams, so we need to convert them
119+
# to strings first. Check if the version number is that old.
120+
self._only_write_strings = sys.version_info[0:2] <= (2, 6)
121+
116122
def dump_stream(self, iterator, stream):
117123
for obj in iterator:
118124
self._write_with_length(obj, stream)
@@ -127,7 +133,10 @@ def load_stream(self, stream):
127133
def _write_with_length(self, obj, stream):
128134
serialized = self.dumps(obj)
129135
write_int(len(serialized), stream)
130-
stream.write(serialized)
136+
if self._only_write_strings:
137+
stream.write(str(serialized))
138+
else:
139+
stream.write(serialized)
131140

132141
def _read_with_length(self, stream):
133142
length = read_int(stream)

0 commit comments

Comments
 (0)