diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala index 7b398f424cea..60e600d8dbd8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala @@ -89,14 +89,6 @@ object CodeFormatter { } new CodeAndComment(code.result().trim(), map) } - - def stripExtraNewLinesAndComments(input: String): String = { - val commentReg = - ("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" + // strip /*comment*/ - """([ |\t]*?\/\/[\s\S]*?\n)""").r // strip //comment - val codeWithoutComment = commentReg.replaceAllIn(input, "") - codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines - } } private class CodeFormatter { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index f3b45799c568..f9c5ef843908 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -373,20 +373,6 @@ class CodegenContext { */ private val placeHolderToComments = new mutable.HashMap[String, String] - /** - * It will count the lines of every Java function generated by whole-stage codegen, - * if there is a function of length greater than spark.sql.codegen.maxLinesPerFunction, - * it will return true. - */ - def isTooLongGeneratedFunction: Boolean = { - classFunctions.values.exists { _.values.exists { - code => - val codeWithoutComments = CodeFormatter.stripExtraNewLinesAndComments(code) - codeWithoutComments.count(_ == '\n') > SQLConf.get.maxLinesPerFunction - } - } - } - /** * Returns a term name that is unique within this instance of a `CodegenContext`. */ @@ -1020,10 +1006,16 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin } object CodeGenerator extends Logging { + + // This is the value of HugeMethodLimit in the OpenJDK JVM settings + val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 + /** * Compile the Java source code into a Java class, using Janino. + * + * @return a pair of a generated class and the max bytecode size of generated functions. */ - def compile(code: CodeAndComment): GeneratedClass = try { + def compile(code: CodeAndComment): (GeneratedClass, Int) = try { cache.get(code) } catch { // Cache.get() may wrap the original exception. See the following URL @@ -1036,7 +1028,7 @@ object CodeGenerator extends Logging { /** * Compile the Java source code into a Java class, using Janino. */ - private[this] def doCompile(code: CodeAndComment): GeneratedClass = { + private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = { val evaluator = new ClassBodyEvaluator() // A special classloader used to wrap the actual parent classloader of @@ -1075,9 +1067,9 @@ object CodeGenerator extends Logging { s"\n${CodeFormatter.format(code)}" }) - try { + val maxCodeSize = try { evaluator.cook("generated.java", code.body) - recordCompilationStats(evaluator) + updateAndGetCompilationStats(evaluator) } catch { case e: JaninoRuntimeException => val msg = s"failed to compile: $e" @@ -1092,13 +1084,15 @@ object CodeGenerator extends Logging { logInfo(s"\n${CodeFormatter.format(code, maxLines)}") throw new CompileException(msg, e.getLocation) } - evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass] + + (evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass], maxCodeSize) } /** - * Records the generated class and method bytecode sizes by inspecting janino private fields. + * Returns the max bytecode size of the generated functions by inspecting janino private fields. + * Also, this method updates the metrics information. */ - private def recordCompilationStats(evaluator: ClassBodyEvaluator): Unit = { + private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = { // First retrieve the generated classes. val classes = { val resultField = classOf[SimpleCompiler].getDeclaredField("result") @@ -1113,23 +1107,26 @@ object CodeGenerator extends Logging { val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute") val codeAttrField = codeAttr.getDeclaredField("code") codeAttrField.setAccessible(true) - classes.foreach { case (_, classBytes) => + val codeSizes = classes.flatMap { case (_, classBytes) => CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length) try { val cf = new ClassFile(new ByteArrayInputStream(classBytes)) - cf.methodInfos.asScala.foreach { method => - method.getAttributes().foreach { a => - if (a.getClass.getName == codeAttr.getName) { - CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update( - codeAttrField.get(a).asInstanceOf[Array[Byte]].length) - } + val stats = cf.methodInfos.asScala.flatMap { method => + method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a => + val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length + CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize) + byteCodeSize } } + Some(stats) } catch { case NonFatal(e) => logWarning("Error calculating stats of compiled class.", e) + None } - } + }.flatten + + codeSizes.max } /** @@ -1144,8 +1141,8 @@ object CodeGenerator extends Logging { private val cache = CacheBuilder.newBuilder() .maximumSize(100) .build( - new CacheLoader[CodeAndComment, GeneratedClass]() { - override def load(code: CodeAndComment): GeneratedClass = { + new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() { + override def load(code: CodeAndComment): (GeneratedClass, Int) = { val startTime = System.nanoTime() val result = doCompile(code) val endTime = System.nanoTime() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 3768dcde00a4..b5429fade53c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -142,7 +142,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP new CodeAndComment(codeBody, ctx.getPlaceHolderToComments())) logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}") - val c = CodeGenerator.compile(code) - c.generate(ctx.references.toArray).asInstanceOf[MutableProjection] + val (clazz, _) = CodeGenerator.compile(code) + clazz.generate(ctx.references.toArray).asInstanceOf[MutableProjection] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index 4e4789598520..1639d1b9dda1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -185,7 +185,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR new CodeAndComment(codeBody, ctx.getPlaceHolderToComments())) logDebug(s"Generated Ordering by ${ordering.mkString(",")}:\n${CodeFormatter.format(code)}") - CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering] + val (clazz, _) = CodeGenerator.compile(code) + clazz.generate(ctx.references.toArray).asInstanceOf[BaseOrdering] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala index e35b9dda6c01..e0fabad6d089 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala @@ -78,6 +78,7 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] { new CodeAndComment(codeBody, ctx.getPlaceHolderToComments())) logDebug(s"Generated predicate '$predicate':\n${CodeFormatter.format(code)}") - CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[Predicate] + val (clazz, _) = CodeGenerator.compile(code) + clazz.generate(ctx.references.toArray).asInstanceOf[Predicate] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala index 192701a82968..1e4ac3f2afd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala @@ -189,8 +189,8 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] new CodeAndComment(codeBody, ctx.getPlaceHolderToComments())) logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}") - val c = CodeGenerator.compile(code) + val (clazz, _) = CodeGenerator.compile(code) val resultRow = new SpecificInternalRow(expressions.map(_.dataType)) - c.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection] + clazz.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index f2a66efc98e7..4bd50aee0551 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -409,7 +409,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro new CodeAndComment(codeBody, ctx.getPlaceHolderToComments())) logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}") - val c = CodeGenerator.compile(code) - c.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection] + val (clazz, _) = CodeGenerator.compile(code) + clazz.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala index 4aa5ec82471e..6bc72a0d75c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala @@ -196,7 +196,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U val code = CodeFormatter.stripOverlappingComments(new CodeAndComment(codeBody, Map.empty)) logDebug(s"SpecificUnsafeRowJoiner($schema1, $schema2):\n${CodeFormatter.format(code)}") - val c = CodeGenerator.compile(code) - c.generate(Array.empty).asInstanceOf[UnsafeRowJoiner] + val (clazz, _) = CodeGenerator.compile(code) + clazz.generate(Array.empty).asInstanceOf[UnsafeRowJoiner] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1a73d168b9b6..58323740b80c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -575,15 +576,15 @@ object SQLConf { "disable logging or -1 to apply no limit.") .createWithDefault(1000) - val WHOLESTAGE_MAX_LINES_PER_FUNCTION = buildConf("spark.sql.codegen.maxLinesPerFunction") + val WHOLESTAGE_HUGE_METHOD_LIMIT = buildConf("spark.sql.codegen.hugeMethodLimit") .internal() - .doc("The maximum lines of a single Java function generated by whole-stage codegen. " + - "When the generated function exceeds this threshold, " + + .doc("The maximum bytecode size of a single compiled Java function generated by whole-stage " + + "codegen. When the compiled function exceeds this threshold, " + "the whole-stage codegen is deactivated for this subtree of the current query plan. " + - "The default value 4000 is the max length of byte code JIT supported " + - "for a single function(8000) divided by 2.") + s"The default value is ${CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT} and " + + "this is a limit in the OpenJDK JVM implementation.") .intConf - .createWithDefault(4000) + .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT) val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") @@ -1058,7 +1059,7 @@ class SQLConf extends Serializable with Logging { def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES) - def maxLinesPerFunction: Int = getConf(WHOLESTAGE_MAX_LINES_PER_FUNCTION) + def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT) def tableRelationCacheSize: Int = getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala index a0f1a64b0ab0..9d0a41661bea 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala @@ -53,38 +53,6 @@ class CodeFormatterSuite extends SparkFunSuite { assert(reducedCode.body === "/*project_c4*/") } - test("removing extra new lines and comments") { - val code = - """ - |/* - | * multi - | * line - | * comments - | */ - | - |public function() { - |/*comment*/ - | /*comment_with_space*/ - |code_body - |//comment - |code_body - | //comment_with_space - | - |code_body - |} - """.stripMargin - - val reducedCode = CodeFormatter.stripExtraNewLinesAndComments(code) - assert(reducedCode === - """ - |public function() { - |code_body - |code_body - |code_body - |} - """.stripMargin) - } - testCase("basic example") { """ |class A { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 268ccfa4edfa..9073d599ac43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -380,16 +380,8 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co override def doExecute(): RDD[InternalRow] = { val (ctx, cleanedSource) = doCodeGen() - if (ctx.isTooLongGeneratedFunction) { - logWarning("Found too long generated codes and JIT optimization might not work, " + - "Whole-stage codegen disabled for this plan, " + - "You can change the config spark.sql.codegen.MaxFunctionLength " + - "to adjust the function length limit:\n " - + s"$treeString") - return child.execute() - } // try to compile and fallback if it failed - try { + val (_, maxCodeSize) = try { CodeGenerator.compile(cleanedSource) } catch { case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback => @@ -397,6 +389,17 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() } + + // Check if compiled code has a too large function + if (maxCodeSize > sqlContext.conf.hugeMethodLimit) { + logWarning(s"Found too long generated codes and JIT optimization might not work: " + + s"the bytecode size was $maxCodeSize, this value went over the limit " + + s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " + + s"for this plan. To avoid this, you can raise the limit " + + s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString") + return child.execute() + } + val references = ctx.references.toArray val durationMs = longMetric("pipelineTime") @@ -405,7 +408,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co assert(rdds.size <= 2, "Up to two input RDDs can be supported") if (rdds.length == 1) { rdds.head.mapPartitionsWithIndex { (index, iter) => - val clazz = CodeGenerator.compile(cleanedSource) + val (clazz, _) = CodeGenerator.compile(cleanedSource) val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator] buffer.init(index, Array(iter)) new Iterator[InternalRow] { @@ -424,7 +427,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co // a small hack to obtain the correct partition index }.mapPartitionsWithIndex { (index, zippedIter) => val (leftIter, rightIter) = zippedIter.next() - val clazz = CodeGenerator.compile(cleanedSource) + val (clazz, _) = CodeGenerator.compile(cleanedSource) val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator] buffer.init(index, Array(leftIter, rightIter)) new Iterator[InternalRow] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index da3464328191..ae600c1ffae8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -227,6 +227,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera new CodeAndComment(codeBody, ctx.getPlaceHolderToComments())) logDebug(s"Generated ColumnarIterator:\n${CodeFormatter.format(code)}") - CodeGenerator.compile(code).generate(Array.empty).asInstanceOf[ColumnarIterator] + val (clazz, _) = CodeGenerator.compile(code) + clazz.generate(Array.empty).asInstanceOf[ColumnarIterator] } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index beeee6a97c8d..aaa77b3ee620 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,10 +17,8 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.{Column, Dataset, Row} -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.{Add, Literal, Stack} -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.execution.joins.SortMergeJoinExec @@ -151,7 +149,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { } } - def genGroupByCodeGenContext(caseNum: Int): CodegenContext = { + def genGroupByCode(caseNum: Int): CodeAndComment = { val caseExp = (1 to caseNum).map { i => s"case when id > $i and id <= ${i + 1} then 1 else 0 end as v$i" }.toList @@ -176,34 +174,15 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { }) assert(wholeStageCodeGenExec.isDefined) - wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._1 + wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._2 } - test("SPARK-21603 check there is a too long generated function") { - withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "1500") { - val ctx = genGroupByCodeGenContext(30) - assert(ctx.isTooLongGeneratedFunction === true) - } - } - - test("SPARK-21603 check there is not a too long generated function") { - withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "1500") { - val ctx = genGroupByCodeGenContext(1) - assert(ctx.isTooLongGeneratedFunction === false) - } - } - - test("SPARK-21603 check there is not a too long generated function when threshold is Int.Max") { - withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> Int.MaxValue.toString) { - val ctx = genGroupByCodeGenContext(30) - assert(ctx.isTooLongGeneratedFunction === false) - } - } - - test("SPARK-21603 check there is a too long generated function when threshold is 0") { - withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "0") { - val ctx = genGroupByCodeGenContext(1) - assert(ctx.isTooLongGeneratedFunction === true) - } + test("SPARK-21871 check if we can get large code size when compiling too long functions") { + val codeWithShortFunctions = genGroupByCode(3) + val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions) + assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) + val codeWithLongFunctions = genGroupByCode(20) + val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions) + assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index 691fa9ac5e1e..aca1be01fa3d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -24,6 +24,7 @@ import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap import org.apache.spark.sql.execution.vectorized.AggregateHashMap +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{LongType, StructType} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.hash.Murmur3_x86_32 @@ -301,10 +302,10 @@ class AggregateBenchmark extends BenchmarkBase { */ } - ignore("max function length of wholestagecodegen") { + ignore("max function bytecode size of wholestagecodegen") { val N = 20 << 15 - val benchmark = new Benchmark("max function length of wholestagecodegen", N) + val benchmark = new Benchmark("max function bytecode size", N) def f(): Unit = sparkSession.range(N) .selectExpr( "id", @@ -333,33 +334,34 @@ class AggregateBenchmark extends BenchmarkBase { .sum() .collect() - benchmark.addCase(s"codegen = F") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") + benchmark.addCase("codegen = F") { iter => + sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false") f() } - benchmark.addCase(s"codegen = T maxLinesPerFunction = 10000") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "10000") + benchmark.addCase("codegen = T hugeMethodLimit = 10000") { iter => + sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "10000") f() } - benchmark.addCase(s"codegen = T maxLinesPerFunction = 1500") { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") - sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "1500") + benchmark.addCase("codegen = T hugeMethodLimit = 1500") { iter => + sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "1500") f() } benchmark.run() /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1 - Intel64 Family 6 Model 58 Stepping 9, GenuineIntel - max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ---------------------------------------------------------------------------------------------- - codegen = F 462 / 533 1.4 704.4 1.0X - codegen = T maxLinesPerFunction = 10000 3444 / 3447 0.2 5255.3 0.1X - codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X + Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 + Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz + + max function bytecode size: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + codegen = F 709 / 803 0.9 1082.1 1.0X + codegen = T hugeMethodLimit = 10000 3485 / 3548 0.2 5317.7 0.2X + codegen = T hugeMethodLimit = 1500 636 / 701 1.0 969.9 1.1X */ }