From 1b27080c79eda70119d92c38015988d0d2dbc5b6 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 12 Sep 2019 09:27:14 +0900 Subject: [PATCH 1/4] Fix --- .../expressions/codegen/CodeGenerator.scala | 51 ++++++++++++------- .../spark/sql/execution/QueryExecution.scala | 3 +- .../sql/execution/WholeStageCodegenExec.scala | 6 +-- .../spark/sql/execution/debug/package.scala | 24 ++++++--- .../execution/WholeStageCodegenSuite.scala | 6 +-- .../sql/execution/debug/DebuggingSuite.scala | 2 +- .../internal/ExecutorSideSQLConfSuite.scala | 2 +- 7 files changed, 60 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 660a1dbaf0aa..fe0472e57aa9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1211,6 +1211,20 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin } } +/** + * Java bytecode statistics of a compiled class by Janino. + */ +case class ByteCodeStats(maxClassCodeSize: Int, maxMethodCodeSize: Int, maxConstPoolSize: Int) + +object ByteCodeStats { + + val unavailable = ByteCodeStats(-1, -1, -1) + + def apply(codeStats: (Int, Int, Int)): ByteCodeStats = { + ByteCodeStats(codeStats._1, codeStats._2, codeStats._3) + } +} + object CodeGenerator extends Logging { // This is the default value of HugeMethodLimit in the OpenJDK HotSpot JVM, @@ -1244,7 +1258,7 @@ object CodeGenerator extends Logging { * * @return a pair of a generated class and the max bytecode size of generated functions. */ - def compile(code: CodeAndComment): (GeneratedClass, Int) = try { + def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try { cache.get(code) } catch { // Cache.get() may wrap the original exception. See the following URL @@ -1257,7 +1271,7 @@ object CodeGenerator extends Logging { /** * Compile the Java source code into a Java class, using Janino. */ - private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = { + private[this] def doCompile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = { val evaluator = new ClassBodyEvaluator() // A special classloader used to wrap the actual parent classloader of @@ -1318,10 +1332,11 @@ object CodeGenerator extends Logging { } /** - * Returns the max bytecode size of the generated functions by inspecting janino private fields. - * Also, this method updates the metrics information. + * Returns the bytecode statistics (max class bytecode size, max method bytecode size, and + * max constant pool size) of generated classes by inspecting janino private fields.inspecting + * janino private fields. Also, this method updates the metrics information. */ - private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = { + private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): ByteCodeStats = { // First retrieve the generated classes. val classes = { val resultField = classOf[SimpleCompiler].getDeclaredField("result") @@ -1336,11 +1351,13 @@ object CodeGenerator extends Logging { val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute") val codeAttrField = codeAttr.getDeclaredField("code") codeAttrField.setAccessible(true) - val codeSizes = classes.flatMap { case (_, classBytes) => - CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length) + val codeStats = classes.map { case (_, classBytes) => + val classCodeSize = classBytes.length + CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classCodeSize) try { val cf = new ClassFile(new ByteArrayInputStream(classBytes)) - val stats = cf.methodInfos.asScala.flatMap { method => + val constPoolSize = cf.getConstantPoolSize + val methodCodeSizes = cf.methodInfos.asScala.flatMap { method => method.getAttributes().filter(_.getClass eq codeAttr).map { a => val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize) @@ -1353,19 +1370,17 @@ object CodeGenerator extends Logging { byteCodeSize } } - Some(stats) + (classCodeSize, methodCodeSizes.max, constPoolSize) } catch { case NonFatal(e) => logWarning("Error calculating stats of compiled class.", e) - None + (classCodeSize, -1, -1) } - }.flatten - - if (codeSizes.nonEmpty) { - codeSizes.max - } else { - 0 } + + ByteCodeStats(codeStats.reduce[(Int, Int, Int)] { case (v1, v2) => + (Math.max(v1._1, v2._1), Math.max(v1._2, v2._2), Math.max(v1._3, v2._3)) + }) } /** @@ -1380,8 +1395,8 @@ object CodeGenerator extends Logging { private val cache = CacheBuilder.newBuilder() .maximumSize(SQLConf.get.codegenCacheMaxEntries) .build( - new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() { - override def load(code: CodeAndComment): (GeneratedClass, Int) = { + new CacheLoader[CodeAndComment, (GeneratedClass, ByteCodeStats)]() { + override def load(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = { val startTime = System.nanoTime() val result = doCompile(code) val endTime = System.nanoTime() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 630d062d6577..f294a56c60e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule @@ -213,7 +214,7 @@ class QueryExecution( * * @return Sequence of WholeStageCodegen subtrees and corresponding codegen */ - def codegenToSeq(): Seq[(String, String)] = { + def codegenToSeq(): Seq[(String, String, ByteCodeStats)] = { org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index ce9a6ea319d5..f723fcfac6d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -688,7 +688,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() // try to compile and fallback if it failed - val (_, maxCodeSize) = try { + val (_, compiledCodeStats) = try { CodeGenerator.compile(cleanedSource) } catch { case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback => @@ -698,9 +698,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) } // Check if compiled code has a too large function - if (maxCodeSize > sqlContext.conf.hugeMethodLimit) { + if (compiledCodeStats.maxMethodCodeSize > sqlContext.conf.hugeMethodLimit) { logInfo(s"Found too long generated codes and JIT optimization might not work: " + - s"the bytecode size ($maxCodeSize) is above the limit " + + s"the bytecode size (${compiledCodeStats.maxMethodCodeSize}) is above the limit " + s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " + s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " + s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 03adeaaa6656..48d18c5d082c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import java.util.Collections import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging @@ -27,7 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeFormatter, CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat @@ -81,11 +82,14 @@ package object debug { def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = { val codegenSeq = codegenStringSeq(plan) append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n") - for (((subtree, code), i) <- codegenSeq.zipWithIndex) { - append(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n") + for (((subtree, code, codeStats), i) <- codegenSeq.zipWithIndex) { + val codeStatsStr = s"maxClassCodeSize:${codeStats.maxClassCodeSize} " + + s"maxMethodCodeSize:${codeStats.maxMethodCodeSize} " + + s"maxConstantPoolSize:${codeStats.maxConstPoolSize}" + append(s"== Subtree ${i + 1} / ${codegenSeq.size} ($codeStatsStr) ==\n") append(subtree) append("\nGenerated code:\n") - append(s"${code}\n") + append(s"$code\n") } } @@ -95,7 +99,7 @@ package object debug { * @param plan the query plan for codegen * @return Sequence of WholeStageCodegen subtrees and corresponding codegen */ - def codegenStringSeq(plan: SparkPlan): Seq[(String, String)] = { + def codegenStringSeq(plan: SparkPlan): Seq[(String, String, ByteCodeStats)] = { val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() plan transform { case s: WholeStageCodegenExec => @@ -105,7 +109,13 @@ package object debug { } codegenSubtrees.toSeq.map { subtree => val (_, source) = subtree.doCodeGen() - (subtree.toString, CodeFormatter.format(source)) + val codeStats = try { + CodeGenerator.compile(source)._2 + } catch { + case NonFatal(_) => + ByteCodeStats.unavailable + } + (subtree.toString, CodeFormatter.format(source), codeStats) } } @@ -130,7 +140,7 @@ package object debug { * @param query the streaming query for codegen * @return Sequence of WholeStageCodegen subtrees and corresponding codegen */ - def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = { + def codegenStringSeq(query: StreamingQuery): Seq[(String, String, ByteCodeStats)] = { val w = asStreamExecution(query) if (w.lastExecution != null) { codegenStringSeq(w.lastExecution.executedPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index d8727d5b584f..8ae6622b9491 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} +import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec @@ -213,10 +213,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession { ignore("SPARK-21871 check if we can get large code size when compiling too long functions") { val codeWithShortFunctions = genGroupByCode(3) - val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions) + val (_, ByteCodeStats(_, maxCodeSize1, _)) = CodeGenerator.compile(codeWithShortFunctions) assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) val codeWithLongFunctions = genGroupByCode(50) - val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions) + val (_, ByteCodeStats(_, maxCodeSize2, _)) = CodeGenerator.compile(codeWithLongFunctions) assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 7a8da7e7669a..79a6e05e36d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -46,7 +46,7 @@ class DebuggingSuite extends SharedSparkSession { val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count() .queryExecution.executedPlan) assert(res.length == 2) - assert(res.forall{ case (subtree, code) => + assert(res.forall{ case (subtree, code, _) => subtree.contains("Range") && code.contains("Object[]")}) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index 94b73ec18637..c0238069afcc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -100,7 +100,7 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count() .queryExecution.executedPlan) assert(res.length == 2) - assert(res.forall { case (_, code) => + assert(res.forall { case (_, code, _) => (code.contains("* Codegend pipeline") == flag) && (code.contains("// input[") == flag) }) From fa4234c0cbdb8aaeb1360d7565f6db5eebe87f30 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 12 Sep 2019 21:20:25 +0900 Subject: [PATCH 2/4] Address reviews --- .../expressions/codegen/CodeGenerator.scala | 28 +++++++++++-------- .../spark/sql/execution/debug/package.scala | 9 +++--- .../execution/WholeStageCodegenSuite.scala | 4 +-- .../sql/execution/debug/DebuggingSuite.scala | 26 +++++++++++++++++ 4 files changed, 49 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index fe0472e57aa9..b7811e5afcde 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1214,15 +1214,11 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin /** * Java bytecode statistics of a compiled class by Janino. */ -case class ByteCodeStats(maxClassCodeSize: Int, maxMethodCodeSize: Int, maxConstPoolSize: Int) +case class ByteCodeStats( + maxClassCodeSize: Int, maxMethodCodeSize: Int, maxConstPoolSize: Int, numInnerClasses: Int) object ByteCodeStats { - - val unavailable = ByteCodeStats(-1, -1, -1) - - def apply(codeStats: (Int, Int, Int)): ByteCodeStats = { - ByteCodeStats(codeStats._1, codeStats._2, codeStats._3) - } + val UNAVAILABLE = ByteCodeStats(-1, -1, -1, -1) } object CodeGenerator extends Logging { @@ -1332,9 +1328,9 @@ object CodeGenerator extends Logging { } /** - * Returns the bytecode statistics (max class bytecode size, max method bytecode size, and - * max constant pool size) of generated classes by inspecting janino private fields.inspecting - * janino private fields. Also, this method updates the metrics information. + * Returns the bytecode statistics (max class bytecode size, max method bytecode size, + * max constant pool size, and # of inner classes) of generated classes + * by inspecting Janino classes. Also, this method updates the metrics information. */ private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): ByteCodeStats = { // First retrieve the generated classes. @@ -1378,9 +1374,17 @@ object CodeGenerator extends Logging { } } - ByteCodeStats(codeStats.reduce[(Int, Int, Int)] { case (v1, v2) => + // Computes the max values of the three metrics: class code size, method code size, + // and constant pool size. + val maxCodeStats = codeStats.reduce[(Int, Int, Int)] { case (v1, v2) => (Math.max(v1._1, v2._1), Math.max(v1._2, v2._2), Math.max(v1._3, v2._3)) - }) + } + ByteCodeStats( + maxClassCodeSize = maxCodeStats._1, + maxMethodCodeSize = maxCodeStats._2, + maxConstPoolSize = maxCodeStats._3, + // Minus 2 for `GeneratedClass` and an outer-most generated class + numInnerClasses = classes.size - 2) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 48d18c5d082c..59e28cb4e7cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -83,9 +83,10 @@ package object debug { val codegenSeq = codegenStringSeq(plan) append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n") for (((subtree, code, codeStats), i) <- codegenSeq.zipWithIndex) { - val codeStatsStr = s"maxClassCodeSize:${codeStats.maxClassCodeSize} " + - s"maxMethodCodeSize:${codeStats.maxMethodCodeSize} " + - s"maxConstantPoolSize:${codeStats.maxConstPoolSize}" + val codeStatsStr = s"maxClassCodeSize:${codeStats.maxClassCodeSize}; " + + s"maxMethodCodeSize:${codeStats.maxMethodCodeSize}; " + + s"maxConstantPoolSize:${codeStats.maxConstPoolSize}; " + + s"numInnerClasses:${codeStats.numInnerClasses}" append(s"== Subtree ${i + 1} / ${codegenSeq.size} ($codeStatsStr) ==\n") append(subtree) append("\nGenerated code:\n") @@ -113,7 +114,7 @@ package object debug { CodeGenerator.compile(source)._2 } catch { case NonFatal(_) => - ByteCodeStats.unavailable + ByteCodeStats.UNAVAILABLE } (subtree.toString, CodeFormatter.format(source), codeStats) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 8ae6622b9491..7cad638745b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -213,10 +213,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession { ignore("SPARK-21871 check if we can get large code size when compiling too long functions") { val codeWithShortFunctions = genGroupByCode(3) - val (_, ByteCodeStats(_, maxCodeSize1, _)) = CodeGenerator.compile(codeWithShortFunctions) + val (_, ByteCodeStats(_, maxCodeSize1, _, _)) = CodeGenerator.compile(codeWithShortFunctions) assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) val codeWithLongFunctions = genGroupByCode(50) - val (_, ByteCodeStats(_, maxCodeSize2, _)) = CodeGenerator.compile(codeWithLongFunctions) + val (_, ByteCodeStats(_, maxCodeSize2, _, _)) = CodeGenerator.compile(codeWithLongFunctions) assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 79a6e05e36d3..b125f2af5ced 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -90,4 +90,30 @@ class DebuggingSuite extends SharedSparkSession { | id LongType: {} |""".stripMargin)) } + + test("Prints bytecode statistics in debugCodegen") { + Seq(("SELECT sum(v) FROM VALUES(1) t(v)", (0, 0)), + // We expect HashAggregate uses an inner class for fast hash maps + // in partial aggregates with keys. + ("SELECT k, avg(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k", (0, 1))) + .foreach { case (query, (expectedNumInnerClasses0, expectedNumInnerClasses1)) => + + val executedPlan = sql(query).queryExecution.executedPlan + val res = codegenStringSeq(executedPlan) + assert(res.length == 2) + assert(res.forall { case (_, _, codeStats) => + codeStats.maxClassCodeSize > 0 && + codeStats.maxMethodCodeSize > 0 && + codeStats.maxConstPoolSize > 0 + }) + assert(res(0)._3.numInnerClasses == expectedNumInnerClasses0) + assert(res(1)._3.numInnerClasses == expectedNumInnerClasses1) + + val debugCodegenStr = codegenString(executedPlan) + assert(debugCodegenStr.contains("maxClassCodeSize:")) + assert(debugCodegenStr.contains("maxMethodCodeSize:")) + assert(debugCodegenStr.contains("maxConstantPoolSize:")) + assert(debugCodegenStr.contains("numInnerClasses:")) + } + } } From a5885e3f279bb2f1b9171b561c33804bed69c7ae Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 13 Sep 2019 10:30:39 +0900 Subject: [PATCH 3/4] Address reviews --- .../expressions/codegen/CodeGenerator.scala | 17 +++--- .../spark/sql/execution/debug/package.scala | 8 ++- .../sql/execution/debug/DebuggingSuite.scala | 54 ++++++++++++------- 3 files changed, 51 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index b7811e5afcde..5e827b419f55 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1230,6 +1230,9 @@ object CodeGenerator extends Logging { // The max valid length of method parameters in JVM. final val MAX_JVM_METHOD_PARAMS_LENGTH = 255 + // The max number of constant pool entries in JVM. + final val MAX_JVM_CONSTANT_POOL_SIZE = 65535 + // This is the threshold over which the methods in an inner class are grouped in a single // method which is going to be called by the outer class instead of the many small ones final val MERGE_SPLIT_METHODS_THRESHOLD = 3 @@ -1374,17 +1377,13 @@ object CodeGenerator extends Logging { } } - // Computes the max values of the three metrics: class code size, method code size, - // and constant pool size. - val maxCodeStats = codeStats.reduce[(Int, Int, Int)] { case (v1, v2) => - (Math.max(v1._1, v2._1), Math.max(v1._2, v2._2), Math.max(v1._3, v2._3)) - } + val (classSizes, maxMethodSizes, constPoolSize) = codeStats.unzip3 ByteCodeStats( - maxClassCodeSize = maxCodeStats._1, - maxMethodCodeSize = maxCodeStats._2, - maxConstPoolSize = maxCodeStats._3, + maxClassCodeSize = classSizes.max, + maxMethodCodeSize = maxMethodSizes.max, + maxConstPoolSize = constPoolSize.max, // Minus 2 for `GeneratedClass` and an outer-most generated class - numInnerClasses = classes.size - 2) + numInnerClasses = classSizes.size - 2) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 59e28cb4e7cc..06a46fe5b7bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -83,9 +83,15 @@ package object debug { val codegenSeq = codegenStringSeq(plan) append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n") for (((subtree, code, codeStats), i) <- codegenSeq.zipWithIndex) { + val usedConstPoolRatio = if (codeStats.maxConstPoolSize > 0) { + val rt = 100.0 * codeStats.maxConstPoolSize / CodeGenerator.MAX_JVM_CONSTANT_POOL_SIZE + "(%.2f%% used)".format(rt) + } else { + "" + } val codeStatsStr = s"maxClassCodeSize:${codeStats.maxClassCodeSize}; " + s"maxMethodCodeSize:${codeStats.maxMethodCodeSize}; " + - s"maxConstantPoolSize:${codeStats.maxConstPoolSize}; " + + s"maxConstantPoolSize:${codeStats.maxConstPoolSize}$usedConstPoolRatio; " + s"numInnerClasses:${codeStats.numInnerClasses}" append(s"== Subtree ${i + 1} / ${codegenSeq.size} ($codeStatsStr) ==\n") append(subtree) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b125f2af5ced..d60e0bdebe98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -19,9 +19,15 @@ package org.apache.spark.sql.execution.debug import java.io.ByteArrayOutputStream +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.execution.{CodegenSupport, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SQLTestData.TestData +import org.apache.spark.sql.types.StructType class DebuggingSuite extends SharedSparkSession { @@ -91,25 +97,37 @@ class DebuggingSuite extends SharedSparkSession { |""".stripMargin)) } + case class DummyCodeGeneratorPlan(useInnerClass: Boolean) + extends CodegenSupport with LeafExecNode { + override def output: Seq[Attribute] = StructType.fromDDL("d int").toAttributes + override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(spark.sparkContext.emptyRDD[InternalRow]) + override protected def doExecute(): RDD[InternalRow] = sys.error("Not used") + override protected def doProduce(ctx: CodegenContext): String = { + if (useInnerClass) { + val innerClassName = ctx.freshName("innerClass") + ctx.addInnerClass( + s""" + |public class $innerClassName { + | public $innerClassName() {} + |} + """.stripMargin) + } + "" + } + } + test("Prints bytecode statistics in debugCodegen") { - Seq(("SELECT sum(v) FROM VALUES(1) t(v)", (0, 0)), - // We expect HashAggregate uses an inner class for fast hash maps - // in partial aggregates with keys. - ("SELECT k, avg(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k", (0, 1))) - .foreach { case (query, (expectedNumInnerClasses0, expectedNumInnerClasses1)) => - - val executedPlan = sql(query).queryExecution.executedPlan - val res = codegenStringSeq(executedPlan) - assert(res.length == 2) - assert(res.forall { case (_, _, codeStats) => - codeStats.maxClassCodeSize > 0 && - codeStats.maxMethodCodeSize > 0 && - codeStats.maxConstPoolSize > 0 - }) - assert(res(0)._3.numInnerClasses == expectedNumInnerClasses0) - assert(res(1)._3.numInnerClasses == expectedNumInnerClasses1) - - val debugCodegenStr = codegenString(executedPlan) + Seq(true, false).foreach { useInnerClass => + val plan = WholeStageCodegenExec(DummyCodeGeneratorPlan(useInnerClass))(codegenStageId = 0) + + val genCodes = codegenStringSeq(plan) + assert(genCodes.length == 1) + val (_, _, codeStats) = genCodes.head + val expectedNumInnerClasses = if (useInnerClass) 1 else 0 + assert(codeStats.maxClassCodeSize > 0 && codeStats.maxMethodCodeSize > 0 && + codeStats.maxConstPoolSize > 0 && codeStats.numInnerClasses == expectedNumInnerClasses) + + val debugCodegenStr = codegenString(plan) assert(debugCodegenStr.contains("maxClassCodeSize:")) assert(debugCodegenStr.contains("maxMethodCodeSize:")) assert(debugCodegenStr.contains("maxConstantPoolSize:")) From dfc6a4cebe6d13cfac8cff9c85d6f9f9df46c807 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 14 Sep 2019 08:28:09 +0900 Subject: [PATCH 4/4] Drop class size --- .../expressions/codegen/CodeGenerator.scala | 26 +++++++++---------- .../spark/sql/execution/debug/package.scala | 3 +-- .../execution/WholeStageCodegenSuite.scala | 4 +-- .../sql/execution/debug/DebuggingSuite.scala | 5 ++-- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 5e827b419f55..89277706233d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1214,11 +1214,10 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin /** * Java bytecode statistics of a compiled class by Janino. */ -case class ByteCodeStats( - maxClassCodeSize: Int, maxMethodCodeSize: Int, maxConstPoolSize: Int, numInnerClasses: Int) +case class ByteCodeStats(maxMethodCodeSize: Int, maxConstPoolSize: Int, numInnerClasses: Int) object ByteCodeStats { - val UNAVAILABLE = ByteCodeStats(-1, -1, -1, -1) + val UNAVAILABLE = ByteCodeStats(-1, -1, -1) } object CodeGenerator extends Logging { @@ -1255,7 +1254,7 @@ object CodeGenerator extends Logging { /** * Compile the Java source code into a Java class, using Janino. * - * @return a pair of a generated class and the max bytecode size of generated functions. + * @return a pair of a generated class and the bytecode statistics of generated functions. */ def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try { cache.get(code) @@ -1309,7 +1308,7 @@ object CodeGenerator extends Logging { s"\n${CodeFormatter.format(code)}" }) - val maxCodeSize = try { + val codeStats = try { evaluator.cook("generated.java", code.body) updateAndGetCompilationStats(evaluator) } catch { @@ -1327,13 +1326,13 @@ object CodeGenerator extends Logging { throw new CompileException(msg, e.getLocation) } - (evaluator.getClazz().getConstructor().newInstance().asInstanceOf[GeneratedClass], maxCodeSize) + (evaluator.getClazz().getConstructor().newInstance().asInstanceOf[GeneratedClass], codeStats) } /** - * Returns the bytecode statistics (max class bytecode size, max method bytecode size, - * max constant pool size, and # of inner classes) of generated classes - * by inspecting Janino classes. Also, this method updates the metrics information. + * Returns the bytecode statistics (max method bytecode size, max constant pool size, and + * # of inner classes) of generated classes by inspecting Janino classes. + * Also, this method updates the metrics information. */ private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): ByteCodeStats = { // First retrieve the generated classes. @@ -1369,21 +1368,20 @@ object CodeGenerator extends Logging { byteCodeSize } } - (classCodeSize, methodCodeSizes.max, constPoolSize) + (methodCodeSizes.max, constPoolSize) } catch { case NonFatal(e) => logWarning("Error calculating stats of compiled class.", e) - (classCodeSize, -1, -1) + (-1, -1) } } - val (classSizes, maxMethodSizes, constPoolSize) = codeStats.unzip3 + val (maxMethodSizes, constPoolSize) = codeStats.unzip ByteCodeStats( - maxClassCodeSize = classSizes.max, maxMethodCodeSize = maxMethodSizes.max, maxConstPoolSize = constPoolSize.max, // Minus 2 for `GeneratedClass` and an outer-most generated class - numInnerClasses = classSizes.size - 2) + numInnerClasses = classes.size - 2) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 06a46fe5b7bf..6a57ef2cafe2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -89,8 +89,7 @@ package object debug { } else { "" } - val codeStatsStr = s"maxClassCodeSize:${codeStats.maxClassCodeSize}; " + - s"maxMethodCodeSize:${codeStats.maxMethodCodeSize}; " + + val codeStatsStr = s"maxMethodCodeSize:${codeStats.maxMethodCodeSize}; " + s"maxConstantPoolSize:${codeStats.maxConstPoolSize}$usedConstPoolRatio; " + s"numInnerClasses:${codeStats.numInnerClasses}" append(s"== Subtree ${i + 1} / ${codegenSeq.size} ($codeStatsStr) ==\n") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 7cad638745b0..6dfb6c85dc9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -213,10 +213,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession { ignore("SPARK-21871 check if we can get large code size when compiling too long functions") { val codeWithShortFunctions = genGroupByCode(3) - val (_, ByteCodeStats(_, maxCodeSize1, _, _)) = CodeGenerator.compile(codeWithShortFunctions) + val (_, ByteCodeStats(maxCodeSize1, _, _)) = CodeGenerator.compile(codeWithShortFunctions) assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) val codeWithLongFunctions = genGroupByCode(50) - val (_, ByteCodeStats(_, maxCodeSize2, _, _)) = CodeGenerator.compile(codeWithLongFunctions) + val (_, ByteCodeStats(maxCodeSize2, _, _)) = CodeGenerator.compile(codeWithLongFunctions) assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index d60e0bdebe98..9a48c1ea0f31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -124,11 +124,10 @@ class DebuggingSuite extends SharedSparkSession { assert(genCodes.length == 1) val (_, _, codeStats) = genCodes.head val expectedNumInnerClasses = if (useInnerClass) 1 else 0 - assert(codeStats.maxClassCodeSize > 0 && codeStats.maxMethodCodeSize > 0 && - codeStats.maxConstPoolSize > 0 && codeStats.numInnerClasses == expectedNumInnerClasses) + assert(codeStats.maxMethodCodeSize > 0 && codeStats.maxConstPoolSize > 0 && + codeStats.numInnerClasses == expectedNumInnerClasses) val debugCodegenStr = codegenString(plan) - assert(debugCodegenStr.contains("maxClassCodeSize:")) assert(debugCodegenStr.contains("maxMethodCodeSize:")) assert(debugCodegenStr.contains("maxConstantPoolSize:")) assert(debugCodegenStr.contains("numInnerClasses:"))