Skip to content

Commit eb667f9

Browse files
committed
[SPARK-42074][SQL] Enable KryoSerializer in TPCDSQueryBenchmark to enforce SQL class registration
### What changes were proposed in this pull request? This PR aims to enable `KryoSerializer` in `TPCDSQueryBenchmark` to enforce build-in SQL class registration. ### Why are the changes needed? GitHub Action CI will ensure that all new SQL related classes to be registered . ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. I also manually tested like the following. ``` $ build/sbt "sql/Test/runMain org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmark --data-location /tmp/tpcds-sf-1" ... [success] Total time: 2050 s (34:10), completed Jan 15, 2023 4:06:12 PM ``` Closes #39584 from dongjoon-hyun/SPARK-42074. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 5f17a40 commit eb667f9

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatu
4646
import org.apache.spark.storage._
4747
import org.apache.spark.util.{BoundedPriorityQueue, ByteBufferInputStream, SerializableConfiguration, SerializableJobConf, Utils}
4848
import org.apache.spark.util.collection.CompactBuffer
49+
import org.apache.spark.util.io.ChunkedByteBuffer
4950

5051
/**
5152
* A Spark serializer that uses the <a href="https://code.google.com/p/kryo/">
@@ -220,6 +221,7 @@ class KryoSerializer(conf: SparkConf)
220221
kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
221222
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
222223
kryo.register(Utils.classForName("scala.math.Ordering$Reverse"))
224+
kryo.register(Utils.classForName("scala.reflect.ClassTag$GenericClassTag"))
223225
kryo.register(classOf[ArrayBuffer[Any]])
224226
kryo.register(classOf[Array[Array[Byte]]])
225227

@@ -466,9 +468,11 @@ private[serializer] object KryoSerializer {
466468
// Commonly used classes.
467469
private val toRegister: Seq[Class[_]] = Seq(
468470
ByteBuffer.allocate(1).getClass,
471+
classOf[Array[ByteBuffer]],
469472
classOf[StorageLevel],
470473
classOf[CompressedMapStatus],
471474
classOf[HighlyCompressedMapStatus],
475+
classOf[ChunkedByteBuffer],
472476
classOf[CompactBuffer[_]],
473477
classOf[BlockManagerId],
474478
classOf[Array[Boolean]],
@@ -503,9 +507,37 @@ private[serializer] object KryoSerializer {
503507
// SQL / ML / MLlib classes once and then re-use that filtered list in newInstance() calls.
504508
private lazy val loadableSparkClasses: Seq[Class[_]] = {
505509
Seq(
510+
"org.apache.spark.sql.catalyst.expressions.BoundReference",
511+
"org.apache.spark.sql.catalyst.expressions.SortOrder",
512+
"[Lorg.apache.spark.sql.catalyst.expressions.SortOrder;",
513+
"org.apache.spark.sql.catalyst.InternalRow",
514+
"org.apache.spark.sql.catalyst.InternalRow$",
515+
"[Lorg.apache.spark.sql.catalyst.InternalRow;",
506516
"org.apache.spark.sql.catalyst.expressions.UnsafeRow",
507517
"org.apache.spark.sql.catalyst.expressions.UnsafeArrayData",
508518
"org.apache.spark.sql.catalyst.expressions.UnsafeMapData",
519+
"org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering",
520+
"org.apache.spark.sql.catalyst.expressions.Ascending$",
521+
"org.apache.spark.sql.catalyst.expressions.NullsFirst$",
522+
"org.apache.spark.sql.catalyst.trees.Origin",
523+
"org.apache.spark.sql.types.IntegerType",
524+
"org.apache.spark.sql.types.IntegerType$",
525+
"org.apache.spark.sql.types.LongType$",
526+
"org.apache.spark.sql.types.Metadata",
527+
"org.apache.spark.sql.types.StringType$",
528+
"org.apache.spark.sql.types.StructField",
529+
"[Lorg.apache.spark.sql.types.StructField;",
530+
"org.apache.spark.sql.types.StructType",
531+
"[Lorg.apache.spark.sql.types.StructType;",
532+
"org.apache.spark.sql.types.DateType$",
533+
"org.apache.spark.sql.types.DecimalType",
534+
"org.apache.spark.sql.types.Decimal$DecimalAsIfIntegral$",
535+
"org.apache.spark.sql.types.Decimal$DecimalIsFractional$",
536+
"org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTaskResult",
537+
"org.apache.spark.sql.execution.joins.EmptyHashedRelation$",
538+
"org.apache.spark.sql.execution.joins.LongHashedRelation",
539+
"org.apache.spark.sql.execution.joins.LongToUnsafeRowMap",
540+
"org.apache.spark.sql.execution.joins.UnsafeHashedRelation",
509541

510542
"org.apache.spark.ml.attribute.Attribute",
511543
"org.apache.spark.ml.attribute.AttributeGroup",

sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging {
5757
.set("spark.executor.memory", "3g")
5858
.set("spark.sql.autoBroadcastJoinThreshold", (20 * 1024 * 1024).toString)
5959
.set("spark.sql.crossJoin.enabled", "true")
60+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
61+
.set("spark.kryo.registrationRequired", "true")
6062

6163
SparkSession.builder.config(conf).getOrCreate()
6264
}

0 commit comments

Comments
 (0)