Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,20 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
}
}

/**
* Java bytecode statistics of a compiled class by Janino.
*/
case class ByteCodeStats(maxClassCodeSize: Int, maxMethodCodeSize: Int, maxConstPoolSize: Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about adding also the number od inner classes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks nice.


object ByteCodeStats {

val unavailable = ByteCodeStats(-1, -1, -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
val unavailable = ByteCodeStats(-1, -1, -1)
val UNAVAILABLE = ByteCodeStats(-1, -1, -1)


def apply(codeStats: (Int, Int, Int)): ByteCodeStats = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmmh..do we really need this?

ByteCodeStats(codeStats._1, codeStats._2, codeStats._3)
}
}

object CodeGenerator extends Logging {

// This is the default value of HugeMethodLimit in the OpenJDK HotSpot JVM,
Expand Down Expand Up @@ -1244,7 +1258,7 @@ object CodeGenerator extends Logging {
*
* @return a pair of a generated class and the max bytecode size of generated functions.
*/
def compile(code: CodeAndComment): (GeneratedClass, Int) = try {
def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try {
cache.get(code)
} catch {
// Cache.get() may wrap the original exception. See the following URL
Expand All @@ -1257,7 +1271,7 @@ object CodeGenerator extends Logging {
/**
* Compile the Java source code into a Java class, using Janino.
*/
private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = {
private[this] def doCompile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = {
val evaluator = new ClassBodyEvaluator()

// A special classloader used to wrap the actual parent classloader of
Expand Down Expand Up @@ -1318,10 +1332,11 @@ object CodeGenerator extends Logging {
}

/**
* Returns the max bytecode size of the generated functions by inspecting janino private fields.
* Also, this method updates the metrics information.
* Returns the bytecode statistics (max class bytecode size, max method bytecode size, and
* max constant pool size) of generated classes by inspecting janino private fields.inspecting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: inspecting janino private fields.inspecting janino private fields seems weird.
Also: could we always spell "Janino" as such?

Copy link
Member Author

@maropu maropu Sep 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh... I'll fix soon. Thanks!

* janino private fields. Also, this method updates the metrics information.
*/
private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = {
private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): ByteCodeStats = {
// First retrieve the generated classes.
val classes = {
val resultField = classOf[SimpleCompiler].getDeclaredField("result")
Expand All @@ -1336,11 +1351,13 @@ object CodeGenerator extends Logging {
val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
val codeAttrField = codeAttr.getDeclaredField("code")
codeAttrField.setAccessible(true)
val codeSizes = classes.flatMap { case (_, classBytes) =>
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
val codeStats = classes.map { case (_, classBytes) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to make the code more readable, by

val (classSizes, maxMethodSizes, constPoolSize) = classes.map....unzip3
ByteCodeStats(
  maxClassCodeSize = classSizes.max,
  maxMethodCodeSize = maxMethodSizes.max,
  maxConstPoolSize = constPoolSize.max,
  // Minus 2 for `GeneratedClass` and an outer-most generated class
  numInnerClasses = classSizes.size - 2)

val classCodeSize = classBytes.length
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classCodeSize)
try {
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
val stats = cf.methodInfos.asScala.flatMap { method =>
val constPoolSize = cf.getConstantPoolSize
val methodCodeSizes = cf.methodInfos.asScala.flatMap { method =>
method.getAttributes().filter(_.getClass eq codeAttr).map { a =>
val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
Expand All @@ -1353,19 +1370,17 @@ object CodeGenerator extends Logging {
byteCodeSize
}
}
Some(stats)
(classCodeSize, methodCodeSizes.max, constPoolSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious: now that we've got a nice new ByteCodeStats type, why use a tuple here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong reason... I just did because I avoided the longer statement in https://github.com/apache/spark/pull/25766/files#diff-8bcc5aea39c73d4bf38aef6f6951d42cR1382;

    ByteCodeStats(codeStats.reduce { case (v1, v2) =>
      (Math.max(v1.maxClassCodeSize, v2.maxClassCodeSize),
        Math.max(v1.maxMethodCodeSize, v2.maxMethodCodeSize),
        Math.max(v1.maxConstPoolSize, v2.maxConstPoolSize))
    })

If there are other reviewers who like that, I'll update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find named fields much more readable than _1 _2 _3. In fact even with tuples I may have written the code like:

ByteCodeStats(codeStats.reduce { case ((maxClassCodeSize1, maxMethodCodeSize1, maxConstPoolSize), (maxClassCodeSize2, maxMethodCodeSize2, maxConstPoolSize2)) =>
      (Math.max(maxClassCodeSize1, maxClassCodeSize2),
        Math.max(maxMethodCodeSize1, maxMethodCodeSize2),
        Math.max(maxConstPoolSize1, maxConstPoolSize2))
    })

and...I'd say the v1.maxClassCodeSize version looks better here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the latest code? I added a new metric (# of inner classes), so using a tuple in that part is ok?

} catch {
case NonFatal(e) =>
logWarning("Error calculating stats of compiled class.", e)
None
(classCodeSize, -1, -1)
}
}.flatten

if (codeSizes.nonEmpty) {
codeSizes.max
} else {
0
}

ByteCodeStats(codeStats.reduce[(Int, Int, Int)] { case (v1, v2) =>
(Math.max(v1._1, v2._1), Math.max(v1._2, v2._2), Math.max(v1._3, v2._3))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this direction.

May we see these values regarding different classes? Is it better to show class name and method name, too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this pr prints statistics per a whole-stage codegen entry, so the current one looks ok to me.

})
}

/**
Expand All @@ -1380,8 +1395,8 @@ object CodeGenerator extends Logging {
private val cache = CacheBuilder.newBuilder()
.maximumSize(SQLConf.get.codegenCacheMaxEntries)
.build(
new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() {
override def load(code: CodeAndComment): (GeneratedClass, Int) = {
new CacheLoader[CodeAndComment, (GeneratedClass, ByteCodeStats)]() {
override def load(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = {
val startTime = System.nanoTime()
val result = doCompile(code)
val endTime = System.nanoTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -213,7 +214,7 @@ class QueryExecution(
*
* @return Sequence of WholeStageCodegen subtrees and corresponding codegen
*/
def codegenToSeq(): Seq[(String, String)] = {
def codegenToSeq(): Seq[(String, String, ByteCodeStats)] = {
org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
// try to compile and fallback if it failed
val (_, maxCodeSize) = try {
val (_, compiledCodeStats) = try {
CodeGenerator.compile(cleanedSource)
} catch {
case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback =>
Expand All @@ -698,9 +698,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
}

// Check if compiled code has a too large function
if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
if (compiledCodeStats.maxMethodCodeSize > sqlContext.conf.hugeMethodLimit) {
logInfo(s"Found too long generated codes and JIT optimization might not work: " +
s"the bytecode size ($maxCodeSize) is above the limit " +
s"the bytecode size (${compiledCodeStats.maxMethodCodeSize}) is above the limit " +
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " +
s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.apache.spark.sql.execution
import java.util.Collections

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeFormatter, CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
Expand Down Expand Up @@ -81,11 +82,14 @@ package object debug {
def writeCodegen(append: String => Unit, plan: SparkPlan): Unit = {
val codegenSeq = codegenStringSeq(plan)
append(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n")
for (((subtree, code), i) <- codegenSeq.zipWithIndex) {
append(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n")
for (((subtree, code, codeStats), i) <- codegenSeq.zipWithIndex) {
val codeStatsStr = s"maxClassCodeSize:${codeStats.maxClassCodeSize} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what about separate them by semicolumn?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You suggested this?

== Subtree 1 / 2 (maxClassCodeSize:3689; maxMethodCodeSize:226; maxConstantPoolSize:167) ==

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, actually if you prefer any other separator, I just find it more readable with a separator

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think that suggested one looks ok to me. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you

s"maxMethodCodeSize:${codeStats.maxMethodCodeSize} " +
s"maxConstantPoolSize:${codeStats.maxConstPoolSize}"
append(s"== Subtree ${i + 1} / ${codegenSeq.size} ($codeStatsStr) ==\n")
append(subtree)
append("\nGenerated code:\n")
append(s"${code}\n")
append(s"$code\n")
}
}

Expand All @@ -95,7 +99,7 @@ package object debug {
* @param plan the query plan for codegen
* @return Sequence of WholeStageCodegen subtrees and corresponding codegen
*/
def codegenStringSeq(plan: SparkPlan): Seq[(String, String)] = {
def codegenStringSeq(plan: SparkPlan): Seq[(String, String, ByteCodeStats)] = {
val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]()
plan transform {
case s: WholeStageCodegenExec =>
Expand All @@ -105,7 +109,13 @@ package object debug {
}
codegenSubtrees.toSeq.map { subtree =>
val (_, source) = subtree.doCodeGen()
(subtree.toString, CodeFormatter.format(source))
val codeStats = try {
CodeGenerator.compile(source)._2
} catch {
case NonFatal(_) =>
ByteCodeStats.unavailable
}
(subtree.toString, CodeFormatter.format(source), codeStats)
}
}

Expand All @@ -130,7 +140,7 @@ package object debug {
* @param query the streaming query for codegen
* @return Sequence of WholeStageCodegen subtrees and corresponding codegen
*/
def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = {
def codegenStringSeq(query: StreamingQuery): Seq[(String, String, ByteCodeStats)] = {
val w = asStreamExecution(query)
if (w.lastExecution != null) {
codegenStringSeq(w.lastExecution.executedPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
Expand Down Expand Up @@ -213,10 +213,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession {

ignore("SPARK-21871 check if we can get large code size when compiling too long functions") {
val codeWithShortFunctions = genGroupByCode(3)
val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions)
val (_, ByteCodeStats(_, maxCodeSize1, _)) = CodeGenerator.compile(codeWithShortFunctions)
assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
val codeWithLongFunctions = genGroupByCode(50)
val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
val (_, ByteCodeStats(_, maxCodeSize2, _)) = CodeGenerator.compile(codeWithLongFunctions)
assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DebuggingSuite extends SharedSparkSession {
val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
.queryExecution.executedPlan)
assert(res.length == 2)
assert(res.forall{ case (subtree, code) =>
assert(res.forall{ case (subtree, code, _) =>
subtree.contains("Range") && code.contains("Object[]")})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
.queryExecution.executedPlan)
assert(res.length == 2)
assert(res.forall { case (_, code) =>
assert(res.forall { case (_, code, _) =>
(code.contains("* Codegend pipeline") == flag) &&
(code.contains("// input[") == flag)
})
Expand Down