diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 660a1dbaf0aa..89277706233d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1211,6 +1211,15 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin } } +/** + * Java bytecode statistics of a compiled class by Janino. + */ +case class ByteCodeStats(maxMethodCodeSize: Int, maxConstPoolSize: Int, numInnerClasses: Int) + +object ByteCodeStats { + val UNAVAILABLE = ByteCodeStats(-1, -1, -1) +} + object CodeGenerator extends Logging { // This is the default value of HugeMethodLimit in the OpenJDK HotSpot JVM, @@ -1220,6 +1229,9 @@ object CodeGenerator extends Logging { // The max valid length of method parameters in JVM. final val MAX_JVM_METHOD_PARAMS_LENGTH = 255 + // The max number of constant pool entries in JVM. + final val MAX_JVM_CONSTANT_POOL_SIZE = 65535 + // This is the threshold over which the methods in an inner class are grouped in a single // method which is going to be called by the outer class instead of the many small ones final val MERGE_SPLIT_METHODS_THRESHOLD = 3 @@ -1242,9 +1254,9 @@ object CodeGenerator extends Logging { /** * Compile the Java source code into a Java class, using Janino. * - * @return a pair of a generated class and the max bytecode size of generated functions. + * @return a pair of a generated class and the bytecode statistics of generated functions. */ - def compile(code: CodeAndComment): (GeneratedClass, Int) = try { + def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try { cache.get(code) } catch { // Cache.get() may wrap the original exception. See the following URL @@ -1257,7 +1269,7 @@ object CodeGenerator extends Logging { /** * Compile the Java source code into a Java class, using Janino. */ - private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = { + private[this] def doCompile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = { val evaluator = new ClassBodyEvaluator() // A special classloader used to wrap the actual parent classloader of @@ -1296,7 +1308,7 @@ object CodeGenerator extends Logging { s"\n${CodeFormatter.format(code)}" }) - val maxCodeSize = try { + val codeStats = try { evaluator.cook("generated.java", code.body) updateAndGetCompilationStats(evaluator) } catch { @@ -1314,14 +1326,15 @@ object CodeGenerator extends Logging { throw new CompileException(msg, e.getLocation) } - (evaluator.getClazz().getConstructor().newInstance().asInstanceOf[GeneratedClass], maxCodeSize) + (evaluator.getClazz().getConstructor().newInstance().asInstanceOf[GeneratedClass], codeStats) } /** - * Returns the max bytecode size of the generated functions by inspecting janino private fields. + * Returns the bytecode statistics (max method bytecode size, max constant pool size, and + * # of inner classes) of generated classes by inspecting Janino classes. * Also, this method updates the metrics information. */ - private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = { + private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): ByteCodeStats = { // First retrieve the generated classes. val classes = { val resultField = classOf[SimpleCompiler].getDeclaredField("result") @@ -1336,11 +1349,13 @@ object CodeGenerator extends Logging { val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute") val codeAttrField = codeAttr.getDeclaredField("code") codeAttrField.setAccessible(true) - val codeSizes = classes.flatMap { case (_, classBytes) => - CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length) + val codeStats = classes.map { case (_, classBytes) => + val classCodeSize = classBytes.length + CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classCodeSize) try { val cf = new ClassFile(new ByteArrayInputStream(classBytes)) - val stats = cf.methodInfos.asScala.flatMap { method => + val constPoolSize = cf.getConstantPoolSize + val methodCodeSizes = cf.methodInfos.asScala.flatMap { method => method.getAttributes().filter(_.getClass eq codeAttr).map { a => val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize) @@ -1353,19 +1368,20 @@ object CodeGenerator extends Logging { byteCodeSize } } - Some(stats) + (methodCodeSizes.max, constPoolSize) } catch { case NonFatal(e) => logWarning("Error calculating stats of compiled class.", e) - None + (-1, -1) } - }.flatten - - if (codeSizes.nonEmpty) { - codeSizes.max - } else { - 0 } + + val (maxMethodSizes, constPoolSize) = codeStats.unzip + ByteCodeStats( + maxMethodCodeSize = maxMethodSizes.max, + maxConstPoolSize = constPoolSize.max, + // Minus 2 for `GeneratedClass` and an outer-most generated class + numInnerClasses = classes.size - 2) } /** @@ -1380,8 +1396,8 @@ object CodeGenerator extends Logging { private val cache = CacheBuilder.newBuilder() .maximumSize(SQLConf.get.codegenCacheMaxEntries) .build( - new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() { - override def load(code: CodeAndComment): (GeneratedClass, Int) = { + new CacheLoader[CodeAndComment, (GeneratedClass, ByteCodeStats)]() { + override def load(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = { val startTime = System.nanoTime() val result = doCompile(code) val endTime = System.nanoTime() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 630d062d6577..f294a56c60e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule @@ -213,7 +214,7 @@ class QueryExecution( * * @return Sequence of WholeStageCodegen subtrees and corresponding codegen */ - def codegenToSeq(): Seq[(String, String)] = { + def codegenToSeq(): Seq[(String, String, ByteCodeStats)] = { org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index ce9a6ea319d5..f723fcfac6d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -688,7 +688,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() // try to compile and fallback if it failed - val (_, maxCodeSize) = try { + val (_, compiledCodeStats) = try { CodeGenerator.compile(cleanedSource) } catch { case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback => @@ -698,9 +698,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) } // Check if compiled code has a too large function - if (maxCodeSize > sqlContext.conf.hugeMethodLimit) { + if (compiledCodeStats.maxMethodCodeSize > sqlContext.conf.hugeMethodLimit) { logInfo(s"Found too long generated codes and JIT optimization might not work: " + - s"the bytecode size ($maxCodeSize) is above the limit " + + s"the bytecode size (${compiledCodeStats.maxMethodCodeSize}) is above the limit " + s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " + s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " + s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 03adeaaa6656..6a57ef2cafe2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import java.util.Collections import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging @@ -27,7 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeFormatter, CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat @@ -81,11 +82,20 @@ package object debug { def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = { val codegenSeq = codegenStringSeq(plan) append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n") - for (((subtree, code), i) <- codegenSeq.zipWithIndex) { - append(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n") + for (((subtree, code, codeStats), i) <- codegenSeq.zipWithIndex) { + val usedConstPoolRatio = if (codeStats.maxConstPoolSize > 0) { + val rt = 100.0 * codeStats.maxConstPoolSize / CodeGenerator.MAX_JVM_CONSTANT_POOL_SIZE + "(%.2f%% used)".format(rt) + } else { + "" + } + val codeStatsStr = s"maxMethodCodeSize:${codeStats.maxMethodCodeSize}; " + + s"maxConstantPoolSize:${codeStats.maxConstPoolSize}$usedConstPoolRatio; " + + s"numInnerClasses:${codeStats.numInnerClasses}" + append(s"== Subtree ${i + 1} / ${codegenSeq.size} ($codeStatsStr) ==\n") append(subtree) append("\nGenerated code:\n") - append(s"${code}\n") + append(s"$code\n") } } @@ -95,7 +105,7 @@ package object debug { * @param plan the query plan for codegen * @return Sequence of WholeStageCodegen subtrees and corresponding codegen */ - def codegenStringSeq(plan: SparkPlan): Seq[(String, String)] = { + def codegenStringSeq(plan: SparkPlan): Seq[(String, String, ByteCodeStats)] = { val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() plan transform { case s: WholeStageCodegenExec => @@ -105,7 +115,13 @@ package object debug { } codegenSubtrees.toSeq.map { subtree => val (_, source) = subtree.doCodeGen() - (subtree.toString, CodeFormatter.format(source)) + val codeStats = try { + CodeGenerator.compile(source)._2 + } catch { + case NonFatal(_) => + ByteCodeStats.UNAVAILABLE + } + (subtree.toString, CodeFormatter.format(source), codeStats) } } @@ -130,7 +146,7 @@ package object debug { * @param query the streaming query for codegen * @return Sequence of WholeStageCodegen subtrees and corresponding codegen */ - def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = { + def codegenStringSeq(query: StreamingQuery): Seq[(String, String, ByteCodeStats)] = { val w = asStreamExecution(query) if (w.lastExecution != null) { codegenStringSeq(w.lastExecution.executedPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index d8727d5b584f..6dfb6c85dc9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} +import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec @@ -213,10 +213,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession { ignore("SPARK-21871 check if we can get large code size when compiling too long functions") { val codeWithShortFunctions = genGroupByCode(3) - val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions) + val (_, ByteCodeStats(maxCodeSize1, _, _)) = CodeGenerator.compile(codeWithShortFunctions) assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) val codeWithLongFunctions = genGroupByCode(50) - val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions) + val (_, ByteCodeStats(maxCodeSize2, _, _)) = CodeGenerator.compile(codeWithLongFunctions) assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index 7a8da7e7669a..9a48c1ea0f31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -19,9 +19,15 @@ package org.apache.spark.sql.execution.debug import java.io.ByteArrayOutputStream +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.execution.{CodegenSupport, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SQLTestData.TestData +import org.apache.spark.sql.types.StructType class DebuggingSuite extends SharedSparkSession { @@ -46,7 +52,7 @@ class DebuggingSuite extends SharedSparkSession { val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count() .queryExecution.executedPlan) assert(res.length == 2) - assert(res.forall{ case (subtree, code) => + assert(res.forall{ case (subtree, code, _) => subtree.contains("Range") && code.contains("Object[]")}) } @@ -90,4 +96,41 @@ class DebuggingSuite extends SharedSparkSession { | id LongType: {} |""".stripMargin)) } + + case class DummyCodeGeneratorPlan(useInnerClass: Boolean) + extends CodegenSupport with LeafExecNode { + override def output: Seq[Attribute] = StructType.fromDDL("d int").toAttributes + override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(spark.sparkContext.emptyRDD[InternalRow]) + override protected def doExecute(): RDD[InternalRow] = sys.error("Not used") + override protected def doProduce(ctx: CodegenContext): String = { + if (useInnerClass) { + val innerClassName = ctx.freshName("innerClass") + ctx.addInnerClass( + s""" + |public class $innerClassName { + | public $innerClassName() {} + |} + """.stripMargin) + } + "" + } + } + + test("Prints bytecode statistics in debugCodegen") { + Seq(true, false).foreach { useInnerClass => + val plan = WholeStageCodegenExec(DummyCodeGeneratorPlan(useInnerClass))(codegenStageId = 0) + + val genCodes = codegenStringSeq(plan) + assert(genCodes.length == 1) + val (_, _, codeStats) = genCodes.head + val expectedNumInnerClasses = if (useInnerClass) 1 else 0 + assert(codeStats.maxMethodCodeSize > 0 && codeStats.maxConstPoolSize > 0 && + codeStats.numInnerClasses == expectedNumInnerClasses) + + val debugCodegenStr = codegenString(plan) + assert(debugCodegenStr.contains("maxMethodCodeSize:")) + assert(debugCodegenStr.contains("maxConstantPoolSize:")) + assert(debugCodegenStr.contains("numInnerClasses:")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index 94b73ec18637..c0238069afcc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -100,7 +100,7 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count() .queryExecution.executedPlan) assert(res.length == 2) - assert(res.forall { case (_, code) => + assert(res.forall { case (_, code, _) => (code.contains("* Codegend pipeline") == flag) && (code.contains("// input[") == flag) })