Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,15 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
}
}

/**
* Java bytecode statistics of a compiled class by Janino.
*/
case class ByteCodeStats(maxMethodCodeSize: Int, maxConstPoolSize: Int, numInnerClasses: Int)

object ByteCodeStats {
val UNAVAILABLE = ByteCodeStats(-1, -1, -1)
}

object CodeGenerator extends Logging {

// This is the default value of HugeMethodLimit in the OpenJDK HotSpot JVM,
Expand All @@ -1220,6 +1229,9 @@ object CodeGenerator extends Logging {
// The max valid length of method parameters in JVM.
final val MAX_JVM_METHOD_PARAMS_LENGTH = 255

// The max number of constant pool entries in JVM.
final val MAX_JVM_CONSTANT_POOL_SIZE = 65535

// This is the threshold over which the methods in an inner class are grouped in a single
// method which is going to be called by the outer class instead of the many small ones
final val MERGE_SPLIT_METHODS_THRESHOLD = 3
Expand All @@ -1242,9 +1254,9 @@ object CodeGenerator extends Logging {
/**
* Compile the Java source code into a Java class, using Janino.
*
* @return a pair of a generated class and the max bytecode size of generated functions.
* @return a pair of a generated class and the bytecode statistics of generated functions.
*/
def compile(code: CodeAndComment): (GeneratedClass, Int) = try {
def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try {
cache.get(code)
} catch {
// Cache.get() may wrap the original exception. See the following URL
Expand All @@ -1257,7 +1269,7 @@ object CodeGenerator extends Logging {
/**
* Compile the Java source code into a Java class, using Janino.
*/
private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = {
private[this] def doCompile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = {
val evaluator = new ClassBodyEvaluator()

// A special classloader used to wrap the actual parent classloader of
Expand Down Expand Up @@ -1296,7 +1308,7 @@ object CodeGenerator extends Logging {
s"\n${CodeFormatter.format(code)}"
})

val maxCodeSize = try {
val codeStats = try {
evaluator.cook("generated.java", code.body)
updateAndGetCompilationStats(evaluator)
} catch {
Expand All @@ -1314,14 +1326,15 @@ object CodeGenerator extends Logging {
throw new CompileException(msg, e.getLocation)
}

(evaluator.getClazz().getConstructor().newInstance().asInstanceOf[GeneratedClass], maxCodeSize)
(evaluator.getClazz().getConstructor().newInstance().asInstanceOf[GeneratedClass], codeStats)
}

/**
* Returns the max bytecode size of the generated functions by inspecting janino private fields.
* Returns the bytecode statistics (max method bytecode size, max constant pool size, and
* # of inner classes) of generated classes by inspecting Janino classes.
* Also, this method updates the metrics information.
*/
private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = {
private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): ByteCodeStats = {
// First retrieve the generated classes.
val classes = {
val resultField = classOf[SimpleCompiler].getDeclaredField("result")
Expand All @@ -1336,11 +1349,13 @@ object CodeGenerator extends Logging {
val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
val codeAttrField = codeAttr.getDeclaredField("code")
codeAttrField.setAccessible(true)
val codeSizes = classes.flatMap { case (_, classBytes) =>
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
val codeStats = classes.map { case (_, classBytes) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to make the code more readable, by

val (classSizes, maxMethodSizes, constPoolSize) = classes.map....unzip3
ByteCodeStats(
  maxClassCodeSize = classSizes.max,
  maxMethodCodeSize = maxMethodSizes.max,
  maxConstPoolSize = constPoolSize.max,
  // Minus 2 for `GeneratedClass` and an outer-most generated class
  numInnerClasses = classSizes.size - 2)

val classCodeSize = classBytes.length
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classCodeSize)
try {
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
val stats = cf.methodInfos.asScala.flatMap { method =>
val constPoolSize = cf.getConstantPoolSize
val methodCodeSizes = cf.methodInfos.asScala.flatMap { method =>
method.getAttributes().filter(_.getClass eq codeAttr).map { a =>
val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
Expand All @@ -1353,19 +1368,20 @@ object CodeGenerator extends Logging {
byteCodeSize
}
}
Some(stats)
(methodCodeSizes.max, constPoolSize)
} catch {
case NonFatal(e) =>
logWarning("Error calculating stats of compiled class.", e)
None
(-1, -1)
}
}.flatten

if (codeSizes.nonEmpty) {
codeSizes.max
} else {
0
}

val (maxMethodSizes, constPoolSize) = codeStats.unzip
ByteCodeStats(
maxMethodCodeSize = maxMethodSizes.max,
maxConstPoolSize = constPoolSize.max,
// Minus 2 for `GeneratedClass` and an outer-most generated class
numInnerClasses = classes.size - 2)
}

/**
Expand All @@ -1380,8 +1396,8 @@ object CodeGenerator extends Logging {
private val cache = CacheBuilder.newBuilder()
.maximumSize(SQLConf.get.codegenCacheMaxEntries)
.build(
new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() {
override def load(code: CodeAndComment): (GeneratedClass, Int) = {
new CacheLoader[CodeAndComment, (GeneratedClass, ByteCodeStats)]() {
override def load(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = {
val startTime = System.nanoTime()
val result = doCompile(code)
val endTime = System.nanoTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -213,7 +214,7 @@ class QueryExecution(
*
* @return Sequence of WholeStageCodegen subtrees and corresponding codegen
*/
def codegenToSeq(): Seq[(String, String)] = {
def codegenToSeq(): Seq[(String, String, ByteCodeStats)] = {
org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
// try to compile and fallback if it failed
val (_, maxCodeSize) = try {
val (_, compiledCodeStats) = try {
CodeGenerator.compile(cleanedSource)
} catch {
case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback =>
Expand All @@ -698,9 +698,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
}

// Check if compiled code has a too large function
if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
if (compiledCodeStats.maxMethodCodeSize > sqlContext.conf.hugeMethodLimit) {
logInfo(s"Found too long generated codes and JIT optimization might not work: " +
s"the bytecode size ($maxCodeSize) is above the limit " +
s"the bytecode size (${compiledCodeStats.maxMethodCodeSize}) is above the limit " +
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " +
s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.apache.spark.sql.execution
import java.util.Collections

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeFormatter, CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
Expand Down Expand Up @@ -81,11 +82,20 @@ package object debug {
def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = {
val codegenSeq = codegenStringSeq(plan)
append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
for (((subtree, code), i) <- codegenSeq.zipWithIndex) {
append(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
for (((subtree, code, codeStats), i) <- codegenSeq.zipWithIndex) {
val usedConstPoolRatio = if (codeStats.maxConstPoolSize > 0) {
val rt = 100.0 * codeStats.maxConstPoolSize / CodeGenerator.MAX_JVM_CONSTANT_POOL_SIZE
"(%.2f%% used)".format(rt)
} else {
""
}
val codeStatsStr = s"maxMethodCodeSize:${codeStats.maxMethodCodeSize}; " +
s"maxConstantPoolSize:${codeStats.maxConstPoolSize}$usedConstPoolRatio; " +
s"numInnerClasses:${codeStats.numInnerClasses}"
append(s"== Subtree ${i + 1} / ${codegenSeq.size} ($codeStatsStr) ==\n")
append(subtree)
append("\nGenerated code:\n")
append(s"${code}\n")
append(s"$code\n")
}
}

Expand All @@ -95,7 +105,7 @@ package object debug {
* @param plan the query plan for codegen
* @return Sequence of WholeStageCodegen subtrees and corresponding codegen
*/
def codegenStringSeq(plan: SparkPlan): Seq[(String, String)] = {
def codegenStringSeq(plan: SparkPlan): Seq[(String, String, ByteCodeStats)] = {
val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]()
plan transform {
case s: WholeStageCodegenExec =>
Expand All @@ -105,7 +115,13 @@ package object debug {
}
codegenSubtrees.toSeq.map { subtree =>
val (_, source) = subtree.doCodeGen()
(subtree.toString, CodeFormatter.format(source))
val codeStats = try {
CodeGenerator.compile(source)._2
} catch {
case NonFatal(_) =>
ByteCodeStats.UNAVAILABLE
}
(subtree.toString, CodeFormatter.format(source), codeStats)
}
}

Expand All @@ -130,7 +146,7 @@ package object debug {
* @param query the streaming query for codegen
* @return Sequence of WholeStageCodegen subtrees and corresponding codegen
*/
def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = {
def codegenStringSeq(query: StreamingQuery): Seq[(String, String, ByteCodeStats)] = {
val w = asStreamExecution(query)
if (w.lastExecution != null) {
codegenStringSeq(w.lastExecution.executedPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
Expand Down Expand Up @@ -213,10 +213,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession {

ignore("SPARK-21871 check if we can get large code size when compiling too long functions") {
val codeWithShortFunctions = genGroupByCode(3)
val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions)
val (_, ByteCodeStats(maxCodeSize1, _, _)) = CodeGenerator.compile(codeWithShortFunctions)
assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
val codeWithLongFunctions = genGroupByCode(50)
val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
val (_, ByteCodeStats(maxCodeSize2, _, _)) = CodeGenerator.compile(codeWithLongFunctions)
assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ package org.apache.spark.sql.execution.debug

import java.io.ByteArrayOutputStream

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.execution.{CodegenSupport, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.StructType

class DebuggingSuite extends SharedSparkSession {

Expand All @@ -46,7 +52,7 @@ class DebuggingSuite extends SharedSparkSession {
val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
.queryExecution.executedPlan)
assert(res.length == 2)
assert(res.forall{ case (subtree, code) =>
assert(res.forall{ case (subtree, code, _) =>
subtree.contains("Range") && code.contains("Object[]")})
}

Expand Down Expand Up @@ -90,4 +96,41 @@ class DebuggingSuite extends SharedSparkSession {
| id LongType: {}
|""".stripMargin))
}

case class DummyCodeGeneratorPlan(useInnerClass: Boolean)
extends CodegenSupport with LeafExecNode {
override def output: Seq[Attribute] = StructType.fromDDL("d int").toAttributes
override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(spark.sparkContext.emptyRDD[InternalRow])
override protected def doExecute(): RDD[InternalRow] = sys.error("Not used")
override protected def doProduce(ctx: CodegenContext): String = {
if (useInnerClass) {
val innerClassName = ctx.freshName("innerClass")
ctx.addInnerClass(
s"""
|public class $innerClassName {
| public $innerClassName() {}
|}
""".stripMargin)
}
""
}
}

test("Prints bytecode statistics in debugCodegen") {
Seq(true, false).foreach { useInnerClass =>
val plan = WholeStageCodegenExec(DummyCodeGeneratorPlan(useInnerClass))(codegenStageId = 0)

val genCodes = codegenStringSeq(plan)
assert(genCodes.length == 1)
val (_, _, codeStats) = genCodes.head
val expectedNumInnerClasses = if (useInnerClass) 1 else 0
assert(codeStats.maxMethodCodeSize > 0 && codeStats.maxConstPoolSize > 0 &&
codeStats.numInnerClasses == expectedNumInnerClasses)

val debugCodegenStr = codegenString(plan)
assert(debugCodegenStr.contains("maxMethodCodeSize:"))
assert(debugCodegenStr.contains("maxConstantPoolSize:"))
assert(debugCodegenStr.contains("numInnerClasses:"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
.queryExecution.executedPlan)
assert(res.length == 2)
assert(res.forall { case (_, code) =>
assert(res.forall { case (_, code, _) =>
(code.contains("* Codegend pipeline") == flag) &&
(code.contains("// input[") == flag)
})
Expand Down