@@ -32,6 +32,7 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
3232 kryo.setRegistrationRequired(false )
3333 kryo.register(classOf [MutablePair [_, _]])
3434 kryo.register(classOf [Array [Any ]])
35+ kryo.register(classOf [scala.collection.immutable.Map $Map1 ], new MapSerializer )
3536 kryo.register(classOf [org.apache.spark.sql.catalyst.expressions.GenericRow ])
3637 kryo.register(classOf [org.apache.spark.sql.catalyst.expressions.GenericMutableRow ])
3738 kryo.register(classOf [scala.collection.mutable.ArrayBuffer [_]])
@@ -70,3 +71,20 @@ class BigDecimalSerializer extends Serializer[BigDecimal] {
7071 BigDecimal (input.readString())
7172 }
7273}
74+
75+ /**
76+ * Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize
77+ * them as `Array[(k,v)]`.
78+ */
79+ class MapSerializer extends Serializer [Map [_,_]] {
80+ def write (kryo : Kryo , output : Output , map : Map [_,_]) {
81+ kryo.writeObject(output, map.flatMap(e => Seq (e._1, e._2)).toArray)
82+ }
83+
84+ def read (kryo : Kryo , input : Input , tpe : Class [Map [_,_]]): Map [_,_] = {
85+ kryo.readObject(input, classOf [Array [Any ]])
86+ .sliding(2 ,2 )
87+ .map { case Array (k,v) => (k,v) }
88+ .toMap
89+ }
90+ }
0 commit comments