Skip to content

Commit e15e574

Browse files
marmbruspwendell
authored andcommitted
[SQL] Add a custom serializer for maps since they do not have a no-arg constructor.
Author: Michael Armbrust <[email protected]> Closes apache#243 from marmbrus/mapSer and squashes the following commits: 54045f7 [Michael Armbrust] Add a custom serializer for maps since they do not have a no-arg constructor.
1 parent 32cbdfd commit e15e574

File tree

1 file changed

+18
-0
lines changed

1 file changed

+18
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
3232
kryo.setRegistrationRequired(false)
3333
kryo.register(classOf[MutablePair[_, _]])
3434
kryo.register(classOf[Array[Any]])
35+
kryo.register(classOf[scala.collection.immutable.Map$Map1], new MapSerializer)
3536
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
3637
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
3738
kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
@@ -70,3 +71,20 @@ class BigDecimalSerializer extends Serializer[BigDecimal] {
7071
BigDecimal(input.readString())
7172
}
7273
}
74+
75+
/**
76+
* Maps do not have a no arg constructor and so cannot be serialized by default. So, we serialize
77+
* them as `Array[(k,v)]`.
78+
*/
79+
class MapSerializer extends Serializer[Map[_,_]] {
80+
def write(kryo: Kryo, output: Output, map: Map[_,_]) {
81+
kryo.writeObject(output, map.flatMap(e => Seq(e._1, e._2)).toArray)
82+
}
83+
84+
def read(kryo: Kryo, input: Input, tpe: Class[Map[_,_]]): Map[_,_] = {
85+
kryo.readObject(input, classOf[Array[Any]])
86+
.sliding(2,2)
87+
.map { case Array(k,v) => (k,v) }
88+
.toMap
89+
}
90+
}

0 commit comments

Comments
 (0)