diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index f860b2a08657..5e2bdd8085e8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -46,6 +46,7 @@ import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatu import org.apache.spark.storage._ import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, SerializableConfiguration, SerializableJobConf, Utils} import org.apache.spark.util.collection.CompactBuffer +import org.apache.spark.util.io.ChunkedByteBuffer /** * A Spark serializer that uses the @@ -220,6 +221,7 @@ class KryoSerializer(conf: SparkConf) kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon")) kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$")) kryo.register(Utils.classForName("scala.math.Ordering$Reverse")) + kryo.register(Utils.classForName("scala.reflect.ClassTag$GenericClassTag")) kryo.register(classOf[ArrayBuffer[Any]]) kryo.register(classOf[Array[Array[Byte]]]) @@ -466,9 +468,11 @@ private[serializer] object KryoSerializer { // Commonly used classes. private val toRegister: Seq[Class[_]] = Seq( ByteBuffer.allocate(1).getClass, + classOf[Array[ByteBuffer]], classOf[StorageLevel], classOf[CompressedMapStatus], classOf[HighlyCompressedMapStatus], + classOf[ChunkedByteBuffer], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Boolean]], @@ -503,9 +507,37 @@ private[serializer] object KryoSerializer { // SQL / ML / MLlib classes once and then re-use that filtered list in newInstance() calls. private lazy val loadableSparkClasses: Seq[Class[_]] = { Seq( + "org.apache.spark.sql.catalyst.expressions.BoundReference", + "org.apache.spark.sql.catalyst.expressions.SortOrder", + "[Lorg.apache.spark.sql.catalyst.expressions.SortOrder;", + "org.apache.spark.sql.catalyst.InternalRow", + "org.apache.spark.sql.catalyst.InternalRow$", + "[Lorg.apache.spark.sql.catalyst.InternalRow;", "org.apache.spark.sql.catalyst.expressions.UnsafeRow", "org.apache.spark.sql.catalyst.expressions.UnsafeArrayData", "org.apache.spark.sql.catalyst.expressions.UnsafeMapData", + "org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering", + "org.apache.spark.sql.catalyst.expressions.Ascending$", + "org.apache.spark.sql.catalyst.expressions.NullsFirst$", + "org.apache.spark.sql.catalyst.trees.Origin", + "org.apache.spark.sql.types.IntegerType", + "org.apache.spark.sql.types.IntegerType$", + "org.apache.spark.sql.types.LongType$", + "org.apache.spark.sql.types.Metadata", + "org.apache.spark.sql.types.StringType$", + "org.apache.spark.sql.types.StructField", + "[Lorg.apache.spark.sql.types.StructField;", + "org.apache.spark.sql.types.StructType", + "[Lorg.apache.spark.sql.types.StructType;", + "org.apache.spark.sql.types.DateType$", + "org.apache.spark.sql.types.DecimalType", + "org.apache.spark.sql.types.Decimal$DecimalAsIfIntegral$", + "org.apache.spark.sql.types.Decimal$DecimalIsFractional$", + "org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTaskResult", + "org.apache.spark.sql.execution.joins.EmptyHashedRelation$", + "org.apache.spark.sql.execution.joins.LongHashedRelation", + "org.apache.spark.sql.execution.joins.LongToUnsafeRowMap", + "org.apache.spark.sql.execution.joins.UnsafeHashedRelation", "org.apache.spark.ml.attribute.Attribute", "org.apache.spark.ml.attribute.AttributeGroup", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index be0b89bc6bc2..ab63ebb1050e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -57,6 +57,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging { .set("spark.executor.memory", "3g") .set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString) .set("spark.sql.crossJoin.enabled", "true") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrationRequired", "true") SparkSession.builder.config(conf).getOrCreate() }