Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ object CodeFormatter {
}
new CodeAndComment(code.result().trim(), map)
}

def stripExtraNewLinesAndComments(input: String): String = {
val commentReg =
("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" + // strip /*comment*/
"""([ |\t]*?\/\/[\s\S]*?\n)""").r // strip //comment
val codeWithoutComment = commentReg.replaceAllIn(input, "")
codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines
}
}

private class CodeFormatter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,20 +373,6 @@ class CodegenContext {
*/
private val placeHolderToComments = new mutable.HashMap[String, String]

/**
* It will count the lines of every Java function generated by whole-stage codegen,
* if there is a function of length greater than spark.sql.codegen.maxLinesPerFunction,
* it will return true.
*/
def isTooLongGeneratedFunction: Boolean = {
classFunctions.values.exists { _.values.exists {
code =>
val codeWithoutComments = CodeFormatter.stripExtraNewLinesAndComments(code)
codeWithoutComments.count(_ == '\n') > SQLConf.get.maxLinesPerFunction
}
}
}

/**
* Returns a term name that is unique within this instance of a `CodegenContext`.
*/
Expand Down Expand Up @@ -1020,10 +1006,16 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
}

object CodeGenerator extends Logging {

// This is the value of HugeMethodLimit in the OpenJDK JVM settings
val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000

/**
* Compile the Java source code into a Java class, using Janino.
*
* @return a pair of a generated class and the max bytecode size of generated functions.
*/
def compile(code: CodeAndComment): GeneratedClass = try {
def compile(code: CodeAndComment): (GeneratedClass, Int) = try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @return to explain what are returned.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

cache.get(code)
} catch {
// Cache.get() may wrap the original exception. See the following URL
Expand All @@ -1036,7 +1028,7 @@ object CodeGenerator extends Logging {
/**
* Compile the Java source code into a Java class, using Janino.
*/
private[this] def doCompile(code: CodeAndComment): GeneratedClass = {
private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = {
val evaluator = new ClassBodyEvaluator()

// A special classloader used to wrap the actual parent classloader of
Expand Down Expand Up @@ -1075,9 +1067,9 @@ object CodeGenerator extends Logging {
s"\n${CodeFormatter.format(code)}"
})

try {
val maxCodeSize = try {
evaluator.cook("generated.java", code.body)
recordCompilationStats(evaluator)
updateAndGetCompilationStats(evaluator)
} catch {
case e: JaninoRuntimeException =>
val msg = s"failed to compile: $e"
Expand All @@ -1092,13 +1084,15 @@ object CodeGenerator extends Logging {
logInfo(s"\n${CodeFormatter.format(code, maxLines)}")
throw new CompileException(msg, e.getLocation)
}
evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass]

(evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass], maxCodeSize)
}

/**
* Records the generated class and method bytecode sizes by inspecting janino private fields.
* Returns the max bytecode size of the generated functions by inspecting janino private fields.
* Also, this method updates the metrics information.
*/
private def recordCompilationStats(evaluator: ClassBodyEvaluator): Unit = {
private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = {
// First retrieve the generated classes.
val classes = {
val resultField = classOf[SimpleCompiler].getDeclaredField("result")
Expand All @@ -1113,23 +1107,26 @@ object CodeGenerator extends Logging {
val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
val codeAttrField = codeAttr.getDeclaredField("code")
codeAttrField.setAccessible(true)
classes.foreach { case (_, classBytes) =>
val codeSizes = classes.flatMap { case (_, classBytes) =>
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
try {
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
cf.methodInfos.asScala.foreach { method =>
method.getAttributes().foreach { a =>
if (a.getClass.getName == codeAttr.getName) {
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
}
val stats = cf.methodInfos.asScala.flatMap { method =>
method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a =>
val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
byteCodeSize
}
}
Some(stats)
} catch {
case NonFatal(e) =>
logWarning("Error calculating stats of compiled class.", e)
None
}
}
}.flatten

codeSizes.max
}

/**
Expand All @@ -1144,8 +1141,8 @@ object CodeGenerator extends Logging {
private val cache = CacheBuilder.newBuilder()
.maximumSize(100)
.build(
new CacheLoader[CodeAndComment, GeneratedClass]() {
override def load(code: CodeAndComment): GeneratedClass = {
new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() {
override def load(code: CodeAndComment): (GeneratedClass, Int) = {
val startTime = System.nanoTime()
val result = doCompile(code)
val endTime = System.nanoTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")

val c = CodeGenerator.compile(code)
c.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"Generated Ordering by ${ordering.mkString(",")}:\n${CodeFormatter.format(code)}")

CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"Generated predicate '$predicate':\n${CodeFormatter.format(code)}")

CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[Predicate]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[Predicate]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")

val c = CodeGenerator.compile(code)
val (clazz, _) = CodeGenerator.compile(code)
val resultRow = new SpecificInternalRow(expressions.map(_.dataType))
c.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
clazz.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")

val c = CodeGenerator.compile(code)
c.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
val code = CodeFormatter.stripOverlappingComments(new CodeAndComment(codeBody, Map.empty))
logDebug(s"SpecificUnsafeRowJoiner($schema1, $schema2):\n${CodeFormatter.format(code)}")

val c = CodeGenerator.compile(code)
c.generate(Array.empty).asInstanceOf[UnsafeRowJoiner]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(Array.empty).asInstanceOf[UnsafeRowJoiner]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -575,15 +576,15 @@ object SQLConf {
"disable logging or -1 to apply no limit.")
.createWithDefault(1000)

val WHOLESTAGE_MAX_LINES_PER_FUNCTION = buildConf("spark.sql.codegen.maxLinesPerFunction")
val WHOLESTAGE_HUGE_METHOD_LIMIT = buildConf("spark.sql.codegen.hugeMethodLimit")
.internal()
.doc("The maximum lines of a single Java function generated by whole-stage codegen. " +
"When the generated function exceeds this threshold, " +
.doc("The maximum bytecode size of a single compiled Java function generated by whole-stage " +
"codegen. When the compiled function exceeds this threshold, " +
"the whole-stage codegen is deactivated for this subtree of the current query plan. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This threshold is not for whole-stage only, right? It is misleading.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, you're right. I'll brush up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

"The default value 4000 is the max length of byte code JIT supported " +
"for a single function(8000) divided by 2.")
s"The default value is ${CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT} and " +
"this is a limit in the OpenJDK JVM implementation.")
.intConf
.createWithDefault(4000)
.createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT)

val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
Expand Down Expand Up @@ -1058,7 +1059,7 @@ class SQLConf extends Serializable with Logging {

def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES)

def maxLinesPerFunction: Int = getConf(WHOLESTAGE_MAX_LINES_PER_FUNCTION)
def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)

def tableRelationCacheSize: Int =
getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,38 +53,6 @@ class CodeFormatterSuite extends SparkFunSuite {
assert(reducedCode.body === "/*project_c4*/")
}

test("removing extra new lines and comments") {
val code =
"""
|/*
| * multi
| * line
| * comments
| */
|
|public function() {
|/*comment*/
| /*comment_with_space*/
|code_body
|//comment
|code_body
| //comment_with_space
|
|code_body
|}
""".stripMargin

val reducedCode = CodeFormatter.stripExtraNewLinesAndComments(code)
assert(reducedCode ===
"""
|public function() {
|code_body
|code_body
|code_body
|}
""".stripMargin)
}

testCase("basic example") {
"""
|class A {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,23 +380,26 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co

override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
if (ctx.isTooLongGeneratedFunction) {
logWarning("Found too long generated codes and JIT optimization might not work, " +
"Whole-stage codegen disabled for this plan, " +
"You can change the config spark.sql.codegen.MaxFunctionLength " +
"to adjust the function length limit:\n "
+ s"$treeString")
return child.execute()
}
// try to compile and fallback if it failed
try {
val (_, maxCodeSize) = try {
CodeGenerator.compile(cleanedSource)
} catch {
case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback =>
// We should already saw the error message
logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
return child.execute()
}

// Check if compiled code has a too large function
if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
logWarning(s"Found too long generated codes and JIT optimization might not work: " +
s"the bytecode size was $maxCodeSize, this value went over the limit " +
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disable " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disable -> disabled

s"for this plan. To avoid this, you can set the limit " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set -> raise . then, remove higher

s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key} higher:\n$treeString")
return child.execute()
}

val references = ctx.references.toArray

val durationMs = longMetric("pipelineTime")
Expand All @@ -405,7 +408,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
rdds.head.mapPartitionsWithIndex { (index, iter) =>
val clazz = CodeGenerator.compile(cleanedSource)
val (clazz, _) = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(iter))
new Iterator[InternalRow] {
Expand All @@ -424,7 +427,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
// a small hack to obtain the correct partition index
}.mapPartitionsWithIndex { (index, zippedIter) =>
val (leftIter, rightIter) = zippedIter.next()
val clazz = CodeGenerator.compile(cleanedSource)
val (clazz, _) = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(leftIter, rightIter))
new Iterator[InternalRow] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"Generated ColumnarIterator:\n${CodeFormatter.format(code)}")

CodeGenerator.compile(code).generate(Array.empty).asInstanceOf[ColumnarIterator]
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(Array.empty).asInstanceOf[ColumnarIterator]
}
}
Loading