Skip to content

Commit 60e4e2f

Browse files
committed
support unpickle array.array for Python 2.6
1 parent ed1980f commit 60e4e2f

File tree

3 files changed

+53
-1
lines changed

3 files changed

+53
-1
lines changed

core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.api.python
1919

20+
import java.nio.ByteOrder
21+
2022
import scala.collection.JavaConversions._
2123
import scala.util.Failure
2224
import scala.util.Try
@@ -28,6 +30,56 @@ import org.apache.spark.rdd.RDD
2830

2931
/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
3032
private[python] object SerDeUtil extends Logging {
33+
// Unpickle array.array generated by Python 2.6
34+
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
35+
// /* Description of types */
36+
// static struct arraydescr descriptors[] = {
37+
// {'c', sizeof(char), c_getitem, c_setitem},
38+
// {'b', sizeof(char), b_getitem, b_setitem},
39+
// {'B', sizeof(char), BB_getitem, BB_setitem},
40+
// #ifdef Py_USING_UNICODE
41+
// {'u', sizeof(Py_UNICODE), u_getitem, u_setitem},
42+
// #endif
43+
// {'h', sizeof(short), h_getitem, h_setitem},
44+
// {'H', sizeof(short), HH_getitem, HH_setitem},
45+
// {'i', sizeof(int), i_getitem, i_setitem},
46+
// {'I', sizeof(int), II_getitem, II_setitem},
47+
// {'l', sizeof(long), l_getitem, l_setitem},
48+
// {'L', sizeof(long), LL_getitem, LL_setitem},
49+
// {'f', sizeof(float), f_getitem, f_setitem},
50+
// {'d', sizeof(double), d_getitem, d_setitem},
51+
// {'\0', 0, 0, 0} /* Sentinel */
52+
// };
53+
// TODO: support Py_UNICODE with 2 bytes
54+
// FIXME: unpickle array of float is wrong in Pyrolite, so we reverse the
55+
// machine code for float/double here to work arround it.
56+
// we should fix this after Pyrolite fix them
57+
val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
58+
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9,
59+
'L' -> 11, 'l' -> 13, 'f' -> 14, 'd' -> 16, 'u' -> 21
60+
)
61+
} else {
62+
Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8,
63+
'L' -> 10, 'l' -> 12, 'f' -> 15, 'd' -> 17, 'u' -> 20
64+
)
65+
}
66+
override def construct(args: Array[Object]): Object = {
67+
if (args.length == 1) {
68+
construct(args ++ Array(""))
69+
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
70+
val typecode = args(0).asInstanceOf[String].charAt(0)
71+
val data: String = args(1).asInstanceOf[String]
72+
println(typecode, machineCodes(typecode), data.length, data.toList)
73+
construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
74+
} else {
75+
super.construct(args)
76+
}
77+
}
78+
}
79+
80+
def initialize() = {
81+
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
82+
}
3183

3284
private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
3385
val pickle = new Pickler

python/pyspark/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ def _ensure_initialized(cls, instance=None, gateway=None):
214214
SparkContext._gateway = gateway or launch_gateway()
215215
SparkContext._jvm = SparkContext._gateway.jvm
216216
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
217+
SparkContext._jvm.SerDeUtil.initialize()
217218

218219
if instance:
219220
if (SparkContext._active_spark_context and

python/pyspark/tests.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,6 @@ def test_oldhadoop(self):
861861
conf=input_conf).collect())
862862
self.assertEqual(old_dataset, dict_data)
863863

864-
@unittest.skipIf(sys.version_info[:2] <= (2, 6), "Skipped on 2.6 until SPARK-2951 is fixed")
865864
def test_newhadoop(self):
866865
basepath = self.tempdir.name
867866
# use custom ArrayWritable types and converters to handle arrays

0 commit comments

Comments
 (0)